ClientSession
The ClientSession subscribes to an Ably channel, decodes incoming messages through a codec, and builds a conversation tree. It owns the channel attach and the cancel-publish path, exposes a default branch-aware View for rendering, and lets you derive additional views over the same tree.
Construct one with createClientSession from the core entry point. For Vercel UIMessage channels, use the pre-bound factory from @ably/ai-transport/vercel instead.
1
2
3
4
5
6
7
8
9
10
11
12
13
import * as Ably from 'ably';
import { createClientSession } from '@ably/ai-transport';
import { UIMessageCodec } from '@ably/ai-transport/vercel';
const ably = new Ably.Realtime({ authUrl: '/api/auth/token' });
const session = createClientSession({
client: ably,
channelName: 'conversation-42',
codec: UIMessageCodec,
});
await session.connect();Properties
treeTreeview in most cases; reach for tree for low-level inspection.viewViewCreate a client session
function createClientSession<TInput, TOutput, TProjection, TMessage>(options: ClientSessionOptions<TInput, TOutput, TProjection, TMessage>): ClientSession<TInput, TOutput, TProjection, TMessage>Construct a ClientSession bound to an Ably channel. The session does not attach to the channel until connect() resolves.
1
2
3
4
5
6
7
8
9
10
11
12
import * as Ably from 'ably';
import { createClientSession } from '@ably/ai-transport';
import { UIMessageCodec } from '@ably/ai-transport/vercel';
const ably = new Ably.Realtime({ authUrl: '/api/auth/token' });
const session = createClientSession({
client: ably,
channelName: 'conversation-42',
codec: UIMessageCodec,
clientId: 'user-abc',
});Parameters
clientrequiredAbly.Realtimesession.close() does not close the client.channelNamerequiredStringcodecrequiredCodec<TInput, TOutput, TProjection, TMessage>clientIdoptionalStringclientId on everything this session publishes. Surfaces on the wire as the run-owner / input-owner id so other clients can attribute messages.messagesoptionalTMessage[]loggeroptionalLoggerReturns
ClientSession<TInput, TOutput, TProjection, TMessage>. The session instance. Call connect() to attach before sending or cancelling.
Connect the session
connect(): Promise<void>Subscribe to the channel and implicitly attach. Idempotent: subsequent calls return the same promise.
All write methods on view and cancel throw InvalidArgument until connect() resolves.
1
await session.connect();Returns
Promise<void>. Resolves when the channel is attached and the session is ready for writes.
Create an additional view
createView(): View<TInput, TMessage>Create an additional view over the same conversation tree. Each view has independent branch selections and pagination state.
The caller owns the returned view's lifecycle: call its close() when it is no longer needed, or session.close() closes it.
1
2
3
4
5
const secondaryView = session.createView();
secondaryView.selectSibling(messageId, 1);
// the default view is unaffected
session.view.branchSelection(messageId).index; // 0Returns
View<TInput, TMessage>. A new view with its own pagination window and branch selection state.
Cancel a run
cancel(runId: string): Promise<void>Publish a cancel signal for the specified Run. The agent receives the cancel through its own channel subscription and ends the Run with reason 'cancelled'.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
const activeRun = await session.view.send({
kind: 'user-message',
message: {
id: crypto.randomUUID(),
role: 'user',
parts: [{ type: 'text', text: 'Tell me a story' }],
},
});
// later, cancel from the UI.
// activeRun.cancel() works immediately, even before the runId
// promise on activeRun has resolved.
await activeRun.cancel();
// Or, when you already have a resolved runId (for example from a
// RunInfo in view.runs()):
// await session.cancel(someRunInfo.runId);Parameters
runIdrequiredStringActiveRun.runId returned by a send.Returns
Promise<void>. Resolves once the cancel message has been published. The cancel is best-effort: if the agent has already ended the Run, the cancel is a no-op.
Subscribe to session errors
on(event: 'error', handler: (error: Ably.ErrorInfo) => void): () => voidSubscribe to non-fatal session errors. These indicate something went wrong but the session is still operational; examples are subscription callback failures and channel continuity loss.
1
2
3
4
5
6
const unsubscribe = session.on('error', (error) => {
console.error('Session error:', error.code, error.message);
});
// later, when the listener is no longer needed
unsubscribe();Parameters
eventrequired'error''error'.handlerrequiredFunctionErrorInfo for every non-fatal error.Returns
() => void. An unsubscribe function. Call it to remove the listener.
Close the session
close(): Promise<void>Tear down the session. Unsubscribe from the channel, close active streams, clear handlers, and prevent further operations.
close() is local-state-only. The server keeps streaming until its Runs end on their own. To stop in-progress Runs, call cancel for each before close().
1
2
3
4
5
6
const runIds = session.view.runs()
.filter((run) => run.status === 'active')
.map((run) => run.runId);
await Promise.all(runIds.map((runId) => session.cancel(runId)));
await session.close();Returns
Promise<void>. Resolves once the channel has been released.
Example
End-to-end usage covering construction, connect, send, and teardown.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import * as Ably from 'ably';
import { createClientSession } from '@ably/ai-transport';
import { UIMessageCodec } from '@ably/ai-transport/vercel';
const ably = new Ably.Realtime({ authUrl: '/api/auth/token' });
const session = createClientSession({
client: ably,
channelName: 'conversation-42',
codec: UIMessageCodec,
clientId: 'user-abc',
});
await session.connect();
session.view.on('update', () => {
render(session.view.getMessages().map(({ message }) => message));
});
const activeRun = await session.view.send(UIMessageCodec.createUserMessage({
id: crypto.randomUUID(),
role: 'user',
parts: [{ type: 'text', text: 'Plan a 3-day trip to Lisbon.' }],
}));
// The SDK doesn't POST. The application wakes the agent itself.
await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(activeRun.toInvocation().toJSON()),
});
// The agent assigns the runId on the server, so activeRun.runId is a
// Promise. Await it to read the id.
const runId = await activeRun.runId;
await session.close();
ably.close();