# AgentSession The `AgentSession` is the server-side counterpart to [`ClientSession`](https://ably.com/docs/ai-transport/api/javascript/core/client-session.md). It subscribes to the channel for cancel signals and creates `Run` instances that publish lifecycle events, user messages, and streamed assistant output. Construct one with `createAgentSession` from the core entry point. For Vercel `UIMessage` channels, use the pre-bound factory from [`@ably/ai-transport/vercel`](https://ably.com/docs/ai-transport/api/javascript/vercel/chat-transport.md) instead. #### Javascript ``` import * as Ably from 'ably'; import { createAgentSession, Invocation } from '@ably/ai-transport'; import { UIMessageCodec } from '@ably/ai-transport/vercel'; const ably = new Ably.Realtime({ key: process.env.ABLY_API_KEY }); const invocation = Invocation.fromJSON(await req.json()); const session = createAgentSession({ client: ably, channelName: invocation.sessionName, codec: UIMessageCodec, }); await session.connect(); const run = session.createRun(invocation, { signal: req.signal }); await run.start(); ``` ## Create an agent session `function createAgentSession(options: AgentSessionOptions): AgentSession` Construct an `AgentSession` bound to an Ably channel. The session does not attach until [`connect()`](#connect) resolves. ### Javascript ``` import * as Ably from 'ably'; import { createAgentSession } from '@ably/ai-transport'; import { UIMessageCodec } from '@ably/ai-transport/vercel'; const ably = new Ably.Realtime({ key: process.env.ABLY_API_KEY }); const session = createAgentSession({ client: ably, channelName: 'conversation-42', codec: UIMessageCodec, }); ``` ### Parameters | Parameter | Required | Description | Type | | --- | --- | --- | --- | | client | required | The Ably Realtime client. The caller owns its lifecycle; `session.close()` does not close the client. | `Ably.Realtime` | | channelName | required | The channel to publish to. The session owns this channel; do not also resolve it elsewhere with conflicting options. | String | | codec | required | The codec used to encode events and messages. | `Codec` | | logger | optional | Logger instance for diagnostic output. | `Logger` | | onError | optional | Called with non-fatal session-level errors not scoped to any run (cancel listener failures, channel continuity loss). | `(error: Ably.ErrorInfo) => void` | | inputEventLookupTimeoutMs | optional | How long `Run.start()` waits for the run's input event(s) to arrive on the channel before rejecting with `InputEventNotFound`. Defaults to 30000. | Number | | inputEventBufferLimit | optional | Maximum number of distinct input events that may be buffered while waiting for `Run.start()` to register a lookup listener. FIFO-evicted when exceeded. Defaults to 200. | Number | | rewindWindow | optional | Channel rewind applied when the agent attaches. Passed verbatim to Ably's `params.rewind`. Accepts duration strings (`"2m"`, `"30s"`) or message counts as strings (`"50"`). Defaults to `"2m"`. | String |
### Returns `AgentSession`. The session instance. Call [`connect()`](#connect) before [`createRun`](#create-run). ## Connect the session `connect(): Promise` Subscribe to the cancel channel and implicitly attach. Idempotent: subsequent calls return the same promise. All `Run` methods (`start`, `addEvents`, `pipe`, `suspend`, `end`) throw `InvalidArgument` until `connect()` resolves. ### Javascript ``` await session.connect(); ``` ### Returns `Promise`. Resolves when the channel is attached and the session is ready to create runs. ## Create a run `createRun(invocation: Invocation, runtime?: RunRuntime): Run` Create a new `Run` from an `Invocation`. Synchronous: no channel activity until [`Run.start`](#run-start) is called. The Run is registered for cancel routing immediately so early cancels fire the `abortSignal`. ### Javascript ``` const invocation = Invocation.fromJSON(await req.json()); const run = session.createRun(invocation, { signal: req.signal }); ``` ### Parameters | Parameter | Required | Description | Type | | --- | --- | --- | --- | | invocation | required | The `Invocation` carrying run identity and conversation context. |
| | runtime | optional | Per-run hooks and an external abort signal. |
|
| Property | Description | Type | | --- | --- | --- | | inputEventId | The specific input event on the channel that triggered this invocation. Run identity is resolved from that event's wire headers. | String | | sessionName | Logical name of the session, used as the Ably channel name. | String |
| Property | Description | Type | | --- | --- | --- | | invocationId | Override the invocation id for this Run. Defaults to a fresh `crypto.randomUUID()` (the normal path; one per HTTP request). Supply a non-empty value for deterministic ids in tests. | String | | runId | Override the run id for a fresh run. Defaults to a fresh `crypto.randomUUID()`. Continuations ignore this and read the existing `runId` off the triggering input event. Supply a non-empty value for deterministic ids in tests. | String | | signal | External `AbortSignal` (typically the HTTP request's `req.signal`) that cancels the run when fired. | `AbortSignal` | | onMessage | Called before each Ably message is published. Mutate the message in place to add custom headers under `extras.ai`. | `(message: Ably.Message) => void` | | onCancelled | Called when the run is cancelled. Receives a `write` function to publish final outputs before cancellation finalises. | `(write: (output: TOutput) => Promise) => void \| Promise` | | onCancel | Called when a cancel arrives. Return `true` to accept, `false` to reject. Defaults to accepting all. | `(req: CancelRequest) => Promise` | | onError | Called with non-fatal run-scoped errors. | `(error: Ably.ErrorInfo) => void` |
### Returns A `Run` handle for publishing lifecycle events, user messages, and streamed output. See [Run interface](#run) below. ## Run The handle returned by [`createRun`](#create-run). ### Properties | Property | Description | Type | | --- | --- | --- | | runId | The Run's unique identifier. | String | | invocationId | The invocation id minted by the agent for this `createRun` call (one per HTTP request). Readable synchronously; the application returns it on the HTTP response. The agent stamps it on every event it publishes for this invocation. | String | | abortSignal | `AbortSignal` scoped to this Run. Fires when a cancel event arrives. | `AbortSignal` | | view | Read-only view of the conversation messages associated with this Run. |
| | messages | The conversation messages this Run should feed to the model. See [Conversation hydration](#conversation-hydration). | `TMessage[]` |
| Property | Description | Type | | --- | --- | --- | | messages | Invocation input messages handed to this Run. No branch awareness today. | `MessageNode[]` |
### Start the run `start(): Promise` Publish the `ai-run-start` event to the channel and wait for the Run's input event to arrive (rewind + live wait). Must be called before `pipe`, `addEvents`, `suspend`, or `end`. Rejects with `InputEventNotFound` (code `104010`) if the input event for this invocation does not arrive within `inputEventLookupTimeoutMs`. Surface the rejection as a non-2xx response so the client's pending send fails. ### Pipe the response stream `pipe(stream: ReadableStream, options?: PipeOptions): Promise` Pipe a `ReadableStream` of outputs through the encoder to the channel. Returns when the stream completes, is cancelled, or errors. Does NOT call `end()`; the caller must call `end()` after `pipe()` returns. #### Parameters | Parameter | Required | Description | Type | | --- | --- | --- | --- | | stream | required | The output stream from your LLM call. | `ReadableStream` | | options | optional | Branching and per-output hooks. |
|
| Property | Description | Type | | --- | --- | --- | | parent | The codec-message-id of the immediately preceding message in this branch. | String | | forkOf | The codec-message-id of the message this response replaces (for regeneration). | String | | resolveWriteOptions | Per-output hook returning `WriteOptions` overrides for a single encode call. | Function |
#### Returns `Promise`. Resolves when the stream ends. Pass `result.reason` to [`Run.end`](#run-end).
| Property | Description | Type | | --- | --- | --- | | reason | Why the stream ended. | `RunEndReason` | | error | The original error when `reason` is `'error'`. | `Error` |
### Add events to existing messages `addEvents(nodes: EventsNode[]): Promise` Publish events targeting existing messages. Each node specifies a `codecMessageId` and the events to apply. Used for cross-Run updates such as tool result delivery after approval. #### Parameters | Parameter | Required | Description | Type | | --- | --- | --- | --- | | nodes | required | Events to apply, each tagged with the codec-message-id of the message to update. |
|
| Property | Description | Type | | --- | --- | --- | | kind | Discriminator: `'event'`. | `'event'` | | codecMessageId | The codec-message-id of the existing message to update. | String | | events | Outputs to apply to the target message. | `TOutput[]` |
### Load a Run's projection `loadProjection(): Promise` Fetch every channel message bound to this Run and fold them through the codec into a single projection. Use this to reconstruct a single Run's full state (including client-published tool-output amends the agent did not observe live) when resuming a suspended Run before piping fresh output. The companion to [`loadConversation`](#load-conversation), which walks ancestors; `loadProjection` covers only the current Run. The caller extracts what they need via [`Codec.getMessages`](https://ably.com/docs/ai-transport/api/javascript/core/codec.md). #### Returns `Promise`. The codec projection produced by folding every event for this Run in serial order. ### Load the full conversation `loadConversation(options?: LoadConversationOptions): Promise` Reconstruct the full multi-turn conversation by walking the ancestor Run chain and concatenating each Run's messages, oldest turn first. After this call, `Run.messages` returns the complete conversation, ready to pass to the LLM. #### Parameters | Parameter | Required | Description | Type | | --- | --- | --- | --- | | options | optional | History pagination tuning. |
|
| Property | Description | Type | | --- | --- | --- | | pageLimit | Wire messages per history page. Defaults to 200. | Number | | maxMessages | Maximum wire messages collected across all pages. Defaults to 2000. | Number |
### Suspend the run `suspend(): Promise` Publish the `ai-run-suspend` event to the channel, pausing the Run pending external input (a tool approval, a human-in-the-loop response). The Run is not terminal: `RunInfo.status` becomes `'suspended'`, and a continuation Invocation resumes it via `ai-run-resume`. Use `suspend` instead of `end` when you want the run to come back. Use `end` only for terminal outcomes. ### End the run `end(reason: RunEndReason): Promise` Publish the `ai-run-end` event to the channel terminally and clean up. `reason` is one of `'complete'`, `'cancelled'`, or `'error'`. To pause a Run instead of ending it, use [`suspend`](#run-suspend). ## Close the session `close(): void` Unsubscribe from the channel and clear handlers. Local-state-only; does not end in-progress Runs. End each Run explicitly before closing the session. ## Invocation A value object wrapping the JSON body a client sends to the agent's HTTP endpoint to start a Run. ### Build from JSON `Invocation.fromJSON(data: InvocationData): Invocation` The entry point used by agent handlers: parse the request body and pass it to `Invocation.fromJSON`, then hand the result to [`createRun`](#create-run). #### Javascript ``` import { Invocation } from '@ably/ai-transport'; const data = await req.json(); const invocation = Invocation.fromJSON(data); ``` #### Parameters | Parameter | Required | Description | Type | | --- | --- | --- | --- | | data | required | The parsed JSON request body matching the `InvocationData` wire shape. |
|
| Property | Description | Type | | --- | --- | --- | | inputEventId | Identifier for the input event on the channel that triggered this invocation. Run identity is resolved from that event's wire headers, not from the body. | String | | sessionName | Logical name of the session, used as the Ably channel name. | String |
## Conversation hydration `Run.messages` returns different content depending on lifecycle stage: - Before [`start`](#run-start) resolves: empty array. - After `start`: the user-prompt messages looked up on the channel for this invocation. - After [`loadConversation`](#load-conversation): the full multi-turn conversation, with ancestor Run messages followed by the current Run's messages, oldest turn first. Use [`loadConversation`](#load-conversation) before piping to the LLM in any conversation past the first turn. ## RunEndReason `'complete' | 'cancelled' | 'error'`. Passed to [`Run.end`](#run-end) and reflected on `RunInfo.status` once the Run terminates. A Run that pauses for external input (tool approval, human-in-the-loop) uses [`Run.suspend`](#run-suspend) instead of `end`, which publishes `ai-run-suspend` and leaves the Run alive at `RunInfo.status === 'suspended'`. A continuation Invocation resumes it via `ai-run-resume`. ## Example An HTTP handler that sets up the session, creates a Run, loads the conversation, pipes the LLM stream, and ends the Run. ### Javascript ``` import * as Ably from 'ably'; import { createAgentSession, Invocation } from '@ably/ai-transport'; import { UIMessageCodec } from '@ably/ai-transport/vercel'; const ably = new Ably.Realtime({ key: process.env.ABLY_API_KEY }); export async function POST(req: Request) { const invocation = Invocation.fromJSON(await req.json()); const session = createAgentSession({ client: ably, channelName: invocation.sessionName, codec: UIMessageCodec, }); await session.connect(); const run = session.createRun(invocation, { signal: req.signal }); try { await run.start(); await run.loadConversation(); const llmStream = await callMyLLM(run.messages); const result = await run.pipe(llmStream); await run.end(result.reason); } catch (err) { await run.end('error'); throw err; } finally { session.close(); } return Response.json({ runId: run.runId, invocationId: run.invocationId }); } ``` ## Related Topics - [Client session](https://ably.com/docs/ai-transport/api/javascript/core/client-session.md): API reference for the AI Transport ClientSession: factory, properties, lifecycle methods, and the View interface returned by createView(). - [Codec](https://ably.com/docs/ai-transport/api/javascript/core/codec.md): API reference for the AI Transport Codec interface. Build custom codecs to integrate any AI framework with Ably channels. ## Documentation Index To discover additional Ably documentation: 1. Fetch [llms.txt](https://ably.com/llms.txt) for the canonical list of available pages. 2. Identify relevant URLs from that index. 3. Fetch target pages as needed. Avoid using assumed or outdated documentation paths.