AgentSession

The AgentSession is the server-side counterpart to ClientSession. It subscribes to the channel for cancel signals and creates Run instances that publish lifecycle events, user messages, and streamed assistant output.

Construct one with createAgentSession from the core entry point. For Vercel UIMessage channels, use the pre-bound factory from @ably/ai-transport/vercel instead.

JavaScript

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

import * as Ably from 'ably';
import { createAgentSession, Invocation } from '@ably/ai-transport';
import { UIMessageCodec } from '@ably/ai-transport/vercel';

const ably = new Ably.Realtime({ key: process.env.ABLY_API_KEY });

const invocation = Invocation.fromJSON(await req.json());

const session = createAgentSession({
  client: ably,
  channelName: invocation.sessionName,
  codec: UIMessageCodec,
});

await session.connect();
const run = session.createRun(invocation, { signal: req.signal });
await run.start();

Create an agent session

function createAgentSession<TInput, TOutput, TProjection, TMessage>(options: AgentSessionOptions<TInput, TOutput, TProjection, TMessage>): AgentSession<TOutput, TProjection, TMessage>

Construct an AgentSession bound to an Ably channel. The session does not attach until connect() resolves.

JavaScript

1

2

3

4

5

6

7

8

9

10

11

import * as Ably from 'ably';
import { createAgentSession } from '@ably/ai-transport';
import { UIMessageCodec } from '@ably/ai-transport/vercel';

const ably = new Ably.Realtime({ key: process.env.ABLY_API_KEY });

const session = createAgentSession({
  client: ably,
  channelName: 'conversation-42',
  codec: UIMessageCodec,
});

Parameters

clientrequiredAbly.Realtime
The Ably Realtime client. The caller owns its lifecycle; session.close() does not close the client.
channelNamerequiredString
The channel to publish to. The session owns this channel; do not also resolve it elsewhere with conflicting options.
codecrequiredCodec<TInput, TOutput, TProjection, TMessage>
The codec used to encode events and messages.
loggeroptionalLogger
Logger instance for diagnostic output.
onErroroptional(error: Ably.ErrorInfo) => void
Called with non-fatal session-level errors not scoped to any run (cancel listener failures, channel continuity loss).
inputEventLookupTimeoutMsoptionalNumber
How long Run.start() waits for the run's input event(s) to arrive on the channel before rejecting with InputEventNotFound. Defaults to 30000.
inputEventBufferLimitoptionalNumber
Maximum number of distinct input events that may be buffered while waiting for Run.start() to register a lookup listener. FIFO-evicted when exceeded. Defaults to 200.
rewindWindowoptionalString
Channel rewind applied when the agent attaches. Passed verbatim to Ably's params.rewind. Accepts duration strings ("2m", "30s") or message counts as strings ("50"). Defaults to "2m".

Returns

AgentSession<TOutput, TProjection, TMessage>. The session instance. Call connect() before createRun.

Connect the session

connect(): Promise<void>

Subscribe to the cancel channel and implicitly attach. Idempotent: subsequent calls return the same promise. All Run methods (start, addEvents, pipe, suspend, end) throw InvalidArgument until connect() resolves.

JavaScript

1

await session.connect();

Returns

Promise<void>. Resolves when the channel is attached and the session is ready to create runs.

Create a run

createRun(invocation: Invocation, runtime?: RunRuntime<TOutput>): Run<TOutput, TProjection, TMessage>

Create a new Run from an Invocation. Synchronous: no channel activity until Run.start is called. The Run is registered for cancel routing immediately so early cancels fire the abortSignal.

JavaScript

1

2

const invocation = Invocation.fromJSON(await req.json());
const run = session.createRun(invocation, { signal: req.signal });

Parameters

invocationrequiredInvocation
The Invocation carrying run identity and conversation context.
runtimeoptionalRunRuntime
Per-run hooks and an external abort signal.

Returns

A Run<TOutput, TProjection, TMessage> handle for publishing lifecycle events, user messages, and streamed output. See Run interface below.

Run

The handle returned by createRun.

Properties

runIdString
The Run's unique identifier.
invocationIdString
The invocation id minted by the agent for this createRun call (one per HTTP request). Readable synchronously; the application returns it on the HTTP response. The agent stamps it on every event it publishes for this invocation.
abortSignalAbortSignal
AbortSignal scoped to this Run. Fires when a cancel event arrives.
viewRunView
Read-only view of the conversation messages associated with this Run.
messagesTMessage[]
The conversation messages this Run should feed to the model. See Conversation hydration.

Start the run

start(): Promise<void>

Publish the ai-run-start event to the channel and wait for the Run's input event to arrive (rewind + live wait). Must be called before pipe, addEvents, suspend, or end.

Rejects with InputEventNotFound (code 104010) if the input event for this invocation does not arrive within inputEventLookupTimeoutMs. Surface the rejection as a non-2xx response so the client's pending send fails.

Pipe the response stream

pipe(stream: ReadableStream<TOutput>, options?: PipeOptions<TOutput>): Promise<StreamResult>

Pipe a ReadableStream of outputs through the encoder to the channel. Returns when the stream completes, is cancelled, or errors. Does NOT call end(); the caller must call end() after pipe() returns.

Parameters

streamrequiredReadableStream<TOutput>
The output stream from your LLM call.
optionsoptionalPipeOptions
Branching and per-output hooks.

Returns

Promise<StreamResult>. Resolves when the stream ends. Pass result.reason to Run.end.

Add events to existing messages

addEvents(nodes: EventsNode<TOutput>[]): Promise<void>

Publish events targeting existing messages. Each node specifies a codecMessageId and the events to apply. Used for cross-Run updates such as tool result delivery after approval.

Parameters

nodesrequiredEventsNode
Events to apply, each tagged with the codec-message-id of the message to update.

Load a Run's projection

loadProjection(): Promise<TProjection>

Fetch every channel message bound to this Run and fold them through the codec into a single projection. Use this to reconstruct a single Run's full state (including client-published tool-output amends the agent did not observe live) when resuming a suspended Run before piping fresh output. The companion to loadConversation, which walks ancestors; loadProjection covers only the current Run.

The caller extracts what they need via Codec.getMessages.

Returns

Promise<TProjection>. The codec projection produced by folding every event for this Run in serial order.

Load the full conversation

loadConversation(options?: LoadConversationOptions): Promise<TMessage[]>

Reconstruct the full multi-turn conversation by walking the ancestor Run chain and concatenating each Run's messages, oldest turn first. After this call, Run.messages returns the complete conversation, ready to pass to the LLM.

Parameters

optionsoptionalLoadConversationOptions
History pagination tuning.

Suspend the run

suspend(): Promise<void>

Publish the ai-run-suspend event to the channel, pausing the Run pending external input (a tool approval, a human-in-the-loop response). The Run is not terminal: RunInfo.status becomes 'suspended', and a continuation Invocation resumes it via ai-run-resume.

Use suspend instead of end when you want the run to come back. Use end only for terminal outcomes.

End the run

end(reason: RunEndReason): Promise<void>

Publish the ai-run-end event to the channel terminally and clean up. reason is one of 'complete', 'cancelled', or 'error'. To pause a Run instead of ending it, use suspend.

Close the session

close(): void

Unsubscribe from the channel and clear handlers. Local-state-only; does not end in-progress Runs. End each Run explicitly before closing the session.

Invocation

A value object wrapping the JSON body a client sends to the agent's HTTP endpoint to start a Run.

Build from JSON

Invocation.fromJSON(data: InvocationData): Invocation

The entry point used by agent handlers: parse the request body and pass it to Invocation.fromJSON, then hand the result to createRun.

JavaScript

1

2

3

4

import { Invocation } from '@ably/ai-transport';

const data = await req.json();
const invocation = Invocation.fromJSON(data);

Parameters

datarequiredInvocationData
The parsed JSON request body matching the InvocationData wire shape.

Conversation hydration

Run.messages returns different content depending on lifecycle stage:

  • Before start resolves: empty array.
  • After start: the user-prompt messages looked up on the channel for this invocation.
  • After loadConversation: the full multi-turn conversation, with ancestor Run messages followed by the current Run's messages, oldest turn first.

Use loadConversation before piping to the LLM in any conversation past the first turn.

RunEndReason

'complete' | 'cancelled' | 'error'. Passed to Run.end and reflected on RunInfo.status once the Run terminates.

A Run that pauses for external input (tool approval, human-in-the-loop) uses Run.suspend instead of end, which publishes ai-run-suspend and leaves the Run alive at RunInfo.status === 'suspended'. A continuation Invocation resumes it via ai-run-resume.

Example

An HTTP handler that sets up the session, creates a Run, loads the conversation, pipes the LLM stream, and ends the Run.

JavaScript

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

import * as Ably from 'ably';
import { createAgentSession, Invocation } from '@ably/ai-transport';
import { UIMessageCodec } from '@ably/ai-transport/vercel';

const ably = new Ably.Realtime({ key: process.env.ABLY_API_KEY });

export async function POST(req: Request) {
  const invocation = Invocation.fromJSON(await req.json());

  const session = createAgentSession({
    client: ably,
    channelName: invocation.sessionName,
    codec: UIMessageCodec,
  });

  await session.connect();

  const run = session.createRun(invocation, { signal: req.signal });

  try {
    await run.start();
    await run.loadConversation();

    const llmStream = await callMyLLM(run.messages);
    const result = await run.pipe(llmStream);
    await run.end(result.reason);
  } catch (err) {
    await run.end('error');
    throw err;
  } finally {
    session.close();
  }

  return Response.json({ runId: run.runId, invocationId: run.invocationId });
}