The ClientTransport subscribes to an Ably channel, decodes incoming messages through a codec, and builds a conversation tree. It exposes a default branch-aware View for rendering, plus methods for cancellation, turn tracking, and lifecycle management.
Construct one with createClientTransport from the core entry point, or use the Vercel-pre-bound factory from @ably/ai-transport/vercel when the channel carries Vercel UIMessage content.
1
2
3
4
5
6
7
8
import { createClientTransport } from '@ably/ai-transport';
import { UIMessageCodec } from '@ably/ai-transport/vercel';
const transport = createClientTransport({
channel,
codec: UIMessageCodec,
api: '/api/chat',
});Properties
The ClientTransport instance has the following properties:
treeTreeviewViewCreate a client transport
function createClientTransport<TEvent, TMessage>(options: ClientTransportOptions<TEvent, TMessage>): ClientTransport<TEvent, TMessage>Construct a ClientTransport bound to an Ably channel. The transport attaches to the channel lazily on first use.
1
2
3
4
5
6
7
8
9
10
11
12
import * as Ably from 'ably';
import { createClientTransport } from '@ably/ai-transport';
import { UIMessageCodec } from '@ably/ai-transport/vercel';
const ably = new Ably.Realtime({ authUrl: '/auth' });
const channel = ably.channels.get('conversation-42');
const transport = createClientTransport({
channel,
codec: UIMessageCodec,
api: '/api/chat',
});Parameters
optionsrequiredClientTransportOptionsReturns
ClientTransport<TEvent, TMessage>
A new client transport. Lifecycle is managed by the caller; call close() to release resources.
Create an additional view
createView(): View<TEvent, TMessage>Create an additional View over the same conversation tree. Each view has independent branch selections and pagination state. The caller is responsible for closing the returned view when it is no longer needed, or it will be closed when the transport closes.
1
const sideView = transport.createView();Returns
View<TEvent, TMessage>
A new view bound to the same tree as transport.view.
Cancel turns
cancel(filter?: CancelFilter): Promise<void>Publish a cancel signal on the channel for every turn that matches filter. Defaults to { own: true } (every turn started by this client).
1
2
3
await transport.cancel(); // cancel own turns
await transport.cancel({ turnId: 'abc' }); // cancel one specific turn
await transport.cancel({ all: true }); // cancel every turn on the channelParameters
filteroptionalCancelFilter{ own: true }.Returns
Promise<void>
Returns a promise. The promise is fulfilled when the cancel signal has been published, or rejected with an ErrorInfo object.
Wait for turns to complete
waitForTurn(filter?: CancelFilter): Promise<void>Wait for every active turn matching filter to complete. Resolves immediately if no matching turns are active. Defaults to { own: true }.
1
2
3
await transport.waitForTurn(); // own turns
await transport.waitForTurn({ turnId: 'abc' }); // one specific turn
await transport.waitForTurn({ all: true }); // every turn on the channelParameters
filteroptionalCancelFilter{ own: true }.Returns
Promise<void>
Returns a promise. The promise is fulfilled when every matching turn has ended, or rejected with an ErrorInfo object.
Stage events on a message
stageEvents(msgId: string, events: TEvent[]): voidApply events to an existing tree message locally and queue them for delivery on the next send. The events are applied via the codec's accumulator (the tree's update fires once with the merged message) and are flushed into the next send operation's POST body for the server to republish.
Use for cross-turn updates where the event value is produced on the client (for example, after addToolResult resolves a client-executed tool) and must appear in the tree immediately.
If msgId is not present in the tree, the call is a no-op and a warning is logged.
Parameters
msgIdrequiredStringx-ably-msg-id of the existing message to amend.eventsrequiredTEvent[]Stage a replacement message
stageMessage(msgId: string, message: TMessage): voidReplace the tree's copy of an existing message with a caller-provided version, preserving headers and serial. Runs synchronously.
Use for useChat-style state transitions the codec cannot express as chunks. The canonical example is addToolApprovalResponse, which sets state: 'approval-responded' on a dynamic-tool part directly on the UIMessage and has no corresponding chunk variant.
Staged messages are not queued for the next send. The tree is authoritative for the POST body's history, so updating it is sufficient.
If msgId is not present in the tree, the call is a no-op and a warning is logged.
Parameters
msgIdrequiredStringx-ably-msg-id of the existing message to replace.messagerequiredTMessageSubscribe to errors
on(event: 'error', handler: (error: Ably.ErrorInfo) => void): () => voidSubscribe to non-fatal transport errors. These indicate something went wrong but the transport is still operational. Returns an unsubscribe function.
1
2
3
4
5
6
const off = transport.on('error', (error) => {
console.error(`Transport error ${error.code}: ${error.message}`);
});
// Later, to stop listening:
off();Parameters
eventrequired'error''error' is supported.handlerrequired(error: Ably.ErrorInfo) => voidErrorInfo.Returns
() => void
A function that unsubscribes the handler.
Close the transport
close(options?: CloseOptions): Promise<void>Tear down the transport: unsubscribe from the channel, close active streams, clear handlers, and prevent further operations. Pass cancel to publish a cancel message before closing; without it, only local state is torn down (the server keeps streaming).
1
2
await transport.close(); // local teardown only
await transport.close({ cancel: { own: true } }); // cancel own turns firstParameters
optionsoptionalCloseOptionsReturns
Promise<void>
Returns a promise. The promise is fulfilled when teardown completes, or rejected with an ErrorInfo object.
View
A View is a paginated, branch-aware projection of the conversation tree. It tracks which branch is selected at each fork point and supports lazy loading of older messages. The transport exposes a default view as transport.view; create additional views with transport.createView().
Get visible messages
getMessages(): TMessage[]The visible domain messages along the selected branch. Shorthand for flattenNodes().map(n => n.message).
Get visible nodes
flattenNodes(): MessageNode<TMessage>[]Visible nodes along the selected branch, filtered by the pagination window. Each node wraps the domain message with tree metadata:
Check for older messages
hasOlder(): booleanWhether there are older messages that can be loaded or revealed.
Load older messages
loadOlder(limit?: number): Promise<void>Reveal older messages. Loads from channel history if the tree does not have enough, then advances the window to show up to limit more messages. Emits 'update' when the visible list changes. Returns a promise; rejection surfaces an ErrorInfo.
Select a sibling at a fork point
select(msgId: string, index: number): voidSelect a sibling at a fork point by index. Updates this view's branch selection. Index is clamped to [0, siblings.length - 1]. Emits 'update' when the visible output changes.
Get the selected sibling index
getSelectedIndex(msgId: string): numberThe index of the currently selected sibling at a fork point.
Get sibling messages
getSiblings(msgId: string): TMessage[]Every sibling message at a fork point, ordered chronologically by serial.
Check for siblings
hasSiblings(msgId: string): booleanWhether a message has sibling alternatives. Use this to decide whether to render branch-navigation arrows.
Get a node by ID
getNode(msgId: string): MessageNode<TMessage> | undefinedThe node for a given message ID, or undefined if the tree has no such node.
Send one or more messages
send(messages: TMessage | TMessage[], options?: SendOptions): Promise<ActiveTurn<TEvent>>Send one or more user messages and start a new turn. The parent is auto-computed from this view's selected branch unless overridden in options. Messages are inserted optimistically into the tree. The HTTP POST is fire-and-forget; the returned stream is available immediately.
1
2
3
4
5
6
7
const turn = await transport.view.send([
{ id: crypto.randomUUID(), role: 'user', parts: [{ type: 'text', text: 'Hello' }] },
]);
for await (const chunk of turn.stream) {
// chunk is one decoded event from the agent.
}Parameters
messagesrequiredTMessage or TMessage[]optionsoptionalSendOptionsReturns
Promise<ActiveTurn<TEvent>>
Returns a promise. The promise is fulfilled with an ActiveTurn carrying the decoded event stream, turnId, and a cancel() method.
Regenerate an assistant message
regenerate(messageId: string, options?: SendOptions): Promise<ActiveTurn<TEvent>>Regenerate an assistant message. Creates a new turn that forks the target message with no new user messages. forkOf, parent, and the truncated history are computed automatically from this view's branch.
Parameters
messageIdrequiredStringoptionsoptionalSendOptionsReturns
Promise<ActiveTurn<TEvent>>
Returns a promise. The promise is fulfilled with an ActiveTurn, or rejected with an ErrorInfo object.
Edit a user message
edit(messageId: string, newMessages: TMessage | TMessage[], options?: SendOptions): Promise<ActiveTurn<TEvent>>Edit a user message and start a new turn from that point. The original message and its descendants remain in the tree as a separate branch. forkOf, parent, and history are computed automatically.
Parameters
messageIdrequiredStringnewMessagesrequiredTMessage or TMessage[]optionsoptionalSendOptionsReturns
Promise<ActiveTurn<TEvent>>
Returns a promise. The promise is fulfilled with an ActiveTurn, or rejected with an ErrorInfo object.
Update an existing message
update(msgId: string, events: TEvent[], options?: SendOptions): Promise<ActiveTurn<TEvent>>Update an existing message in place and start a continuation turn. The local tree is updated optimistically, then the events are sent to the server in the POST body. The server publishes them to the channel and streams a continuation response. Commonly used for delivering client-executed tool results.
Parameters
msgIdrequiredStringx-ably-msg-id of the existing message to amend.eventsrequiredTEvent[]optionsoptionalSendOptionsReturns
Promise<ActiveTurn<TEvent>>
Returns a promise. The promise is fulfilled with an ActiveTurn carrying the continuation response stream, or rejected with an ErrorInfo object.
ActiveTurn
An ActiveTurn is the handle returned by view.send(), view.regenerate(), view.edit(), and view.update(). It exposes the decoded event stream, the turn's identity, and a cancel handle.
streamReadableStream<TEvent>turnIdStringcancelFunction() => Promise<void>. Cancel this specific turn. Publishes a cancel message and closes the local stream.optimisticMsgIdsstring[]send, edit); empty for regenerate.Example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import * as Ably from 'ably';
import { createClientTransport } from '@ably/ai-transport';
import { UIMessageCodec } from '@ably/ai-transport/vercel';
const ably = new Ably.Realtime({ authUrl: '/auth' });
const channel = ably.channels.get('conversation-42');
const transport = createClientTransport({
channel,
codec: UIMessageCodec,
api: '/api/chat',
});
transport.on('error', (error) => {
console.error(`Transport error ${error.code}: ${error.message}`);
});
// Send a message and consume the response stream.
const turn = await transport.view.send([
{ id: crypto.randomUUID(), role: 'user', parts: [{ type: 'text', text: 'Summarise this contract.' }] },
]);
for await (const chunk of turn.stream) {
// chunk is one decoded event for this turn.
}
// Browse history and regenerate the last response.
await transport.view.loadOlder(50);
const last = transport.view.getMessages().at(-1);
if (last?.role === 'assistant') {
await transport.view.regenerate(last.id);
}
// Cancel an in-flight turn from anywhere.
await transport.cancel({ own: true });
// Wait for the current set of own turns to finish before tearing down.
await transport.waitForTurn();
await transport.close();