AgentSession
The AgentSession is the server-side counterpart to ClientSession. It subscribes to the channel for cancel signals and creates Run instances that publish lifecycle events, user messages, and streamed assistant output.
Construct one with createAgentSession from the core entry point. For Vercel UIMessage channels, use the pre-bound factory from @ably/ai-transport/vercel instead.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import * as Ably from 'ably';
import { createAgentSession, Invocation } from '@ably/ai-transport';
import { UIMessageCodec } from '@ably/ai-transport/vercel';
const ably = new Ably.Realtime({ key: process.env.ABLY_API_KEY });
const invocation = Invocation.fromJSON(await req.json());
const session = createAgentSession({
client: ably,
channelName: invocation.sessionName,
codec: UIMessageCodec,
});
await session.connect();
const run = session.createRun(invocation, { signal: req.signal });
await run.start();Create an agent session
function createAgentSession<TInput, TOutput, TProjection, TMessage>(options: AgentSessionOptions<TInput, TOutput, TProjection, TMessage>): AgentSession<TOutput, TProjection, TMessage>Construct an AgentSession bound to an Ably channel. The session does not attach until connect() resolves.
1
2
3
4
5
6
7
8
9
10
11
import * as Ably from 'ably';
import { createAgentSession } from '@ably/ai-transport';
import { UIMessageCodec } from '@ably/ai-transport/vercel';
const ably = new Ably.Realtime({ key: process.env.ABLY_API_KEY });
const session = createAgentSession({
client: ably,
channelName: 'conversation-42',
codec: UIMessageCodec,
});Parameters
clientrequiredAbly.Realtimesession.close() does not close the client.channelNamerequiredStringcodecrequiredCodec<TInput, TOutput, TProjection, TMessage>loggeroptionalLoggeronErroroptional(error: Ably.ErrorInfo) => voidinputEventLookupTimeoutMsoptionalNumberRun.start() waits for the run's input event(s) to arrive on the channel before rejecting with InputEventNotFound. Defaults to 30000.inputEventBufferLimitoptionalNumberRun.start() to register a lookup listener. FIFO-evicted when exceeded. Defaults to 200.rewindWindowoptionalStringparams.rewind. Accepts duration strings ("2m", "30s") or message counts as strings ("50"). Defaults to "2m".Returns
AgentSession<TOutput, TProjection, TMessage>. The session instance. Call connect() before createRun.
Connect the session
connect(): Promise<void>Subscribe to the cancel channel and implicitly attach. Idempotent: subsequent calls return the same promise. All Run methods (start, addEvents, pipe, suspend, end) throw InvalidArgument until connect() resolves.
1
await session.connect();Returns
Promise<void>. Resolves when the channel is attached and the session is ready to create runs.
Create a run
createRun(invocation: Invocation, runtime?: RunRuntime<TOutput>): Run<TOutput, TProjection, TMessage>Create a new Run from an Invocation. Synchronous: no channel activity until Run.start is called. The Run is registered for cancel routing immediately so early cancels fire the abortSignal.
1
2
const invocation = Invocation.fromJSON(await req.json());
const run = session.createRun(invocation, { signal: req.signal });Parameters
invocationrequiredInvocationInvocation carrying run identity and conversation context.runtimeoptionalRunRuntimeReturns
A Run<TOutput, TProjection, TMessage> handle for publishing lifecycle events, user messages, and streamed output. See Run interface below.
Run
The handle returned by createRun.
Properties
runIdStringinvocationIdStringcreateRun call (one per HTTP request). Readable synchronously; the application returns it on the HTTP response. The agent stamps it on every event it publishes for this invocation.abortSignalAbortSignalAbortSignal scoped to this Run. Fires when a cancel event arrives.viewRunViewmessagesTMessage[]Start the run
start(): Promise<void>Publish the ai-run-start event to the channel and wait for the Run's input event to arrive (rewind + live wait). Must be called before pipe, addEvents, suspend, or end.
Rejects with InputEventNotFound (code 104010) if the input event for this invocation does not arrive within inputEventLookupTimeoutMs. Surface the rejection as a non-2xx response so the client's pending send fails.
Pipe the response stream
pipe(stream: ReadableStream<TOutput>, options?: PipeOptions<TOutput>): Promise<StreamResult>Pipe a ReadableStream of outputs through the encoder to the channel. Returns when the stream completes, is cancelled, or errors. Does NOT call end(); the caller must call end() after pipe() returns.
Parameters
streamrequiredReadableStream<TOutput>optionsoptionalPipeOptionsReturns
Promise<StreamResult>. Resolves when the stream ends. Pass result.reason to Run.end.
Add events to existing messages
addEvents(nodes: EventsNode<TOutput>[]): Promise<void>Publish events targeting existing messages. Each node specifies a codecMessageId and the events to apply. Used for cross-Run updates such as tool result delivery after approval.
Parameters
nodesrequiredEventsNodeLoad a Run's projection
loadProjection(): Promise<TProjection>Fetch every channel message bound to this Run and fold them through the codec into a single projection. Use this to reconstruct a single Run's full state (including client-published tool-output amends the agent did not observe live) when resuming a suspended Run before piping fresh output. The companion to loadConversation, which walks ancestors; loadProjection covers only the current Run.
The caller extracts what they need via Codec.getMessages.
Returns
Promise<TProjection>. The codec projection produced by folding every event for this Run in serial order.
Load the full conversation
loadConversation(options?: LoadConversationOptions): Promise<TMessage[]>Reconstruct the full multi-turn conversation by walking the ancestor Run chain and concatenating each Run's messages, oldest turn first. After this call, Run.messages returns the complete conversation, ready to pass to the LLM.
Parameters
optionsoptionalLoadConversationOptionsSuspend the run
suspend(): Promise<void>Publish the ai-run-suspend event to the channel, pausing the Run pending external input (a tool approval, a human-in-the-loop response). The Run is not terminal: RunInfo.status becomes 'suspended', and a continuation Invocation resumes it via ai-run-resume.
Use suspend instead of end when you want the run to come back. Use end only for terminal outcomes.
End the run
end(reason: RunEndReason): Promise<void>Publish the ai-run-end event to the channel terminally and clean up. reason is one of 'complete', 'cancelled', or 'error'. To pause a Run instead of ending it, use suspend.
Close the session
close(): voidUnsubscribe from the channel and clear handlers. Local-state-only; does not end in-progress Runs. End each Run explicitly before closing the session.
Invocation
A value object wrapping the JSON body a client sends to the agent's HTTP endpoint to start a Run.
Build from JSON
Invocation.fromJSON(data: InvocationData): InvocationThe entry point used by agent handlers: parse the request body and pass it to Invocation.fromJSON, then hand the result to createRun.
1
2
3
4
import { Invocation } from '@ably/ai-transport';
const data = await req.json();
const invocation = Invocation.fromJSON(data);Parameters
datarequiredInvocationDataInvocationData wire shape.Conversation hydration
Run.messages returns different content depending on lifecycle stage:
- Before
startresolves: empty array. - After
start: the user-prompt messages looked up on the channel for this invocation. - After
loadConversation: the full multi-turn conversation, with ancestor Run messages followed by the current Run's messages, oldest turn first.
Use loadConversation before piping to the LLM in any conversation past the first turn.
RunEndReason
'complete' | 'cancelled' | 'error'. Passed to Run.end and reflected on RunInfo.status once the Run terminates.
A Run that pauses for external input (tool approval, human-in-the-loop) uses Run.suspend instead of end, which publishes ai-run-suspend and leaves the Run alive at RunInfo.status === 'suspended'. A continuation Invocation resumes it via ai-run-resume.
Example
An HTTP handler that sets up the session, creates a Run, loads the conversation, pipes the LLM stream, and ends the Run.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import * as Ably from 'ably';
import { createAgentSession, Invocation } from '@ably/ai-transport';
import { UIMessageCodec } from '@ably/ai-transport/vercel';
const ably = new Ably.Realtime({ key: process.env.ABLY_API_KEY });
export async function POST(req: Request) {
const invocation = Invocation.fromJSON(await req.json());
const session = createAgentSession({
client: ably,
channelName: invocation.sessionName,
codec: UIMessageCodec,
});
await session.connect();
const run = session.createRun(invocation, { signal: req.signal });
try {
await run.start();
await run.loadConversation();
const llmStream = await callMyLLM(run.messages);
const result = await run.pipe(llmStream);
await run.end(result.reason);
} catch (err) {
await run.end('error');
throw err;
} finally {
session.close();
}
return Response.json({ runId: run.runId, invocationId: run.invocationId });
}