ClientTransport

Open in

The ClientTransport subscribes to an Ably channel, decodes incoming messages through a codec, and builds a conversation tree. It exposes a default branch-aware View for rendering, plus methods for cancellation, turn tracking, and lifecycle management.

Construct one with createClientTransport from the core entry point, or use the Vercel-pre-bound factory from @ably/ai-transport/vercel when the channel carries Vercel UIMessage content.

JavaScript

1

2

3

4

5

6

7

8

import { createClientTransport } from '@ably/ai-transport';
import { UIMessageCodec } from '@ably/ai-transport/vercel';

const transport = createClientTransport({
  channel,
  codec: UIMessageCodec,
  api: '/api/chat',
});

Properties

The ClientTransport instance has the following properties:

treeTree
The complete conversation tree. Holds every known node, emits events on any change. Use the view in most cases; reach for the tree for low-level inspection.
viewView
The default paginated, branch-aware view for rendering. Events are scoped to the visible messages.

Create a client transport

function createClientTransport<TEvent, TMessage>(options: ClientTransportOptions<TEvent, TMessage>): ClientTransport<TEvent, TMessage>

Construct a ClientTransport bound to an Ably channel. The transport attaches to the channel lazily on first use.

JavaScript

1

2

3

4

5

6

7

8

9

10

11

12

import * as Ably from 'ably';
import { createClientTransport } from '@ably/ai-transport';
import { UIMessageCodec } from '@ably/ai-transport/vercel';

const ably = new Ably.Realtime({ authUrl: '/auth' });
const channel = ably.channels.get('conversation-42');

const transport = createClientTransport({
  channel,
  codec: UIMessageCodec,
  api: '/api/chat',
});

Parameters

optionsrequiredClientTransportOptions
Configuration for the client transport.

Returns

ClientTransport<TEvent, TMessage>

A new client transport. Lifecycle is managed by the caller; call close() to release resources.

Create an additional view

createView(): View<TEvent, TMessage>

Create an additional View over the same conversation tree. Each view has independent branch selections and pagination state. The caller is responsible for closing the returned view when it is no longer needed, or it will be closed when the transport closes.

JavaScript

1

const sideView = transport.createView();

Returns

View<TEvent, TMessage>

A new view bound to the same tree as transport.view.

Cancel turns

cancel(filter?: CancelFilter): Promise<void>

Publish a cancel signal on the channel for every turn that matches filter. Defaults to { own: true } (every turn started by this client).

JavaScript

1

2

3

await transport.cancel();                          // cancel own turns
await transport.cancel({ turnId: 'abc' });         // cancel one specific turn
await transport.cancel({ all: true });             // cancel every turn on the channel

Parameters

filteroptionalCancelFilter
Scope of the cancellation. Defaults to { own: true }.

Returns

Promise<void>

Returns a promise. The promise is fulfilled when the cancel signal has been published, or rejected with an ErrorInfo object.

Wait for turns to complete

waitForTurn(filter?: CancelFilter): Promise<void>

Wait for every active turn matching filter to complete. Resolves immediately if no matching turns are active. Defaults to { own: true }.

JavaScript

1

2

3

await transport.waitForTurn();                         // own turns
await transport.waitForTurn({ turnId: 'abc' });        // one specific turn
await transport.waitForTurn({ all: true });            // every turn on the channel

Parameters

filteroptionalCancelFilter
Scope of the wait. Defaults to { own: true }.

Returns

Promise<void>

Returns a promise. The promise is fulfilled when every matching turn has ended, or rejected with an ErrorInfo object.

Stage events on a message

stageEvents(msgId: string, events: TEvent[]): void

Apply events to an existing tree message locally and queue them for delivery on the next send. The events are applied via the codec's accumulator (the tree's update fires once with the merged message) and are flushed into the next send operation's POST body for the server to republish.

Use for cross-turn updates where the event value is produced on the client (for example, after addToolResult resolves a client-executed tool) and must appear in the tree immediately.

If msgId is not present in the tree, the call is a no-op and a warning is logged.

Parameters

msgIdrequiredString
The x-ably-msg-id of the existing message to amend.
eventsrequiredTEvent[]
Events to apply locally and ship on the next send.

Stage a replacement message

stageMessage(msgId: string, message: TMessage): void

Replace the tree's copy of an existing message with a caller-provided version, preserving headers and serial. Runs synchronously.

Use for useChat-style state transitions the codec cannot express as chunks. The canonical example is addToolApprovalResponse, which sets state: 'approval-responded' on a dynamic-tool part directly on the UIMessage and has no corresponding chunk variant.

Staged messages are not queued for the next send. The tree is authoritative for the POST body's history, so updating it is sufficient.

If msgId is not present in the tree, the call is a no-op and a warning is logged.

Parameters

msgIdrequiredString
The x-ably-msg-id of the existing message to replace.
messagerequiredTMessage
The patched message to store.

Subscribe to errors

on(event: 'error', handler: (error: Ably.ErrorInfo) => void): () => void

Subscribe to non-fatal transport errors. These indicate something went wrong but the transport is still operational. Returns an unsubscribe function.

JavaScript

1

2

3

4

5

6

const off = transport.on('error', (error) => {
  console.error(`Transport error ${error.code}: ${error.message}`);
});

// Later, to stop listening:
off();

Parameters

eventrequired'error'
The event name. Currently only 'error' is supported.
handlerrequired(error: Ably.ErrorInfo) => void
Called with each emitted ErrorInfo.

Returns

() => void

A function that unsubscribes the handler.

Close the transport

close(options?: CloseOptions): Promise<void>

Tear down the transport: unsubscribe from the channel, close active streams, clear handlers, and prevent further operations. Pass cancel to publish a cancel message before closing; without it, only local state is torn down (the server keeps streaming).

JavaScript

1

2

await transport.close();                                 // local teardown only
await transport.close({ cancel: { own: true } });        // cancel own turns first

Parameters

optionsoptionalCloseOptions
Close options.

Returns

Promise<void>

Returns a promise. The promise is fulfilled when teardown completes, or rejected with an ErrorInfo object.

View

A View is a paginated, branch-aware projection of the conversation tree. It tracks which branch is selected at each fork point and supports lazy loading of older messages. The transport exposes a default view as transport.view; create additional views with transport.createView().

Get visible messages

getMessages(): TMessage[]

The visible domain messages along the selected branch. Shorthand for flattenNodes().map(n => n.message).

Get visible nodes

flattenNodes(): MessageNode<TMessage>[]

Visible nodes along the selected branch, filtered by the pagination window. Each node wraps the domain message with tree metadata:

Check for older messages

hasOlder(): boolean

Whether there are older messages that can be loaded or revealed.

Load older messages

loadOlder(limit?: number): Promise<void>

Reveal older messages. Loads from channel history if the tree does not have enough, then advances the window to show up to limit more messages. Emits 'update' when the visible list changes. Returns a promise; rejection surfaces an ErrorInfo.

Select a sibling at a fork point

select(msgId: string, index: number): void

Select a sibling at a fork point by index. Updates this view's branch selection. Index is clamped to [0, siblings.length - 1]. Emits 'update' when the visible output changes.

Get the selected sibling index

getSelectedIndex(msgId: string): number

The index of the currently selected sibling at a fork point.

Get sibling messages

getSiblings(msgId: string): TMessage[]

Every sibling message at a fork point, ordered chronologically by serial.

Check for siblings

hasSiblings(msgId: string): boolean

Whether a message has sibling alternatives. Use this to decide whether to render branch-navigation arrows.

Get a node by ID

getNode(msgId: string): MessageNode<TMessage> | undefined

The node for a given message ID, or undefined if the tree has no such node.

Send one or more messages

send(messages: TMessage | TMessage[], options?: SendOptions): Promise<ActiveTurn<TEvent>>

Send one or more user messages and start a new turn. The parent is auto-computed from this view's selected branch unless overridden in options. Messages are inserted optimistically into the tree. The HTTP POST is fire-and-forget; the returned stream is available immediately.

JavaScript

1

2

3

4

5

6

7

const turn = await transport.view.send([
  { id: crypto.randomUUID(), role: 'user', parts: [{ type: 'text', text: 'Hello' }] },
]);

for await (const chunk of turn.stream) {
  // chunk is one decoded event from the agent.
}

Parameters

messagesrequiredTMessage or TMessage[]
One message or an array of messages to send.
optionsoptionalSendOptions
Per-send overrides for headers, body, and branching metadata.

Returns

Promise<ActiveTurn<TEvent>>

Returns a promise. The promise is fulfilled with an ActiveTurn carrying the decoded event stream, turnId, and a cancel() method.

Regenerate an assistant message

regenerate(messageId: string, options?: SendOptions): Promise<ActiveTurn<TEvent>>

Regenerate an assistant message. Creates a new turn that forks the target message with no new user messages. forkOf, parent, and the truncated history are computed automatically from this view's branch.

Parameters

messageIdrequiredString
The msg-id of the assistant message to regenerate.
optionsoptionalSendOptions
Per-send overrides.

Returns

Promise<ActiveTurn<TEvent>>

Returns a promise. The promise is fulfilled with an ActiveTurn, or rejected with an ErrorInfo object.

Edit a user message

edit(messageId: string, newMessages: TMessage | TMessage[], options?: SendOptions): Promise<ActiveTurn<TEvent>>

Edit a user message and start a new turn from that point. The original message and its descendants remain in the tree as a separate branch. forkOf, parent, and history are computed automatically.

Parameters

messageIdrequiredString
The msg-id of the user message to fork.
newMessagesrequiredTMessage or TMessage[]
The replacement message or messages.
optionsoptionalSendOptions
Per-send overrides.

Returns

Promise<ActiveTurn<TEvent>>

Returns a promise. The promise is fulfilled with an ActiveTurn, or rejected with an ErrorInfo object.

Update an existing message

update(msgId: string, events: TEvent[], options?: SendOptions): Promise<ActiveTurn<TEvent>>

Update an existing message in place and start a continuation turn. The local tree is updated optimistically, then the events are sent to the server in the POST body. The server publishes them to the channel and streams a continuation response. Commonly used for delivering client-executed tool results.

Parameters

msgIdrequiredString
The x-ably-msg-id of the existing message to amend.
eventsrequiredTEvent[]
Events to apply to the target message (for example, tool output).
optionsoptionalSendOptions
Per-send overrides.

Returns

Promise<ActiveTurn<TEvent>>

Returns a promise. The promise is fulfilled with an ActiveTurn carrying the continuation response stream, or rejected with an ErrorInfo object.

ActiveTurn

An ActiveTurn is the handle returned by view.send(), view.regenerate(), view.edit(), and view.update(). It exposes the decoded event stream, the turn's identity, and a cancel handle.

streamReadableStream<TEvent>
The decoded event stream for this turn. May error if the delivery guarantee is broken (POST failure, channel continuity loss).
turnIdString
The turn's unique identifier.
cancelFunction
() => Promise<void>. Cancel this specific turn. Publishes a cancel message and closes the local stream.
optimisticMsgIdsstring[]
The msg-ids of optimistically inserted user messages, in order. Present when the send included user messages (send, edit); empty for regenerate.

Example

JavaScript

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

import * as Ably from 'ably';
import { createClientTransport } from '@ably/ai-transport';
import { UIMessageCodec } from '@ably/ai-transport/vercel';

const ably = new Ably.Realtime({ authUrl: '/auth' });
const channel = ably.channels.get('conversation-42');

const transport = createClientTransport({
  channel,
  codec: UIMessageCodec,
  api: '/api/chat',
});

transport.on('error', (error) => {
  console.error(`Transport error ${error.code}: ${error.message}`);
});

// Send a message and consume the response stream.
const turn = await transport.view.send([
  { id: crypto.randomUUID(), role: 'user', parts: [{ type: 'text', text: 'Summarise this contract.' }] },
]);

for await (const chunk of turn.stream) {
  // chunk is one decoded event for this turn.
}

// Browse history and regenerate the last response.
await transport.view.loadOlder(50);
const last = transport.view.getMessages().at(-1);
if (last?.role === 'assistant') {
  await transport.view.regenerate(last.id);
}

// Cancel an in-flight turn from anywhere.
await transport.cancel({ own: true });

// Wait for the current set of own turns to finish before tearing down.
await transport.waitForTurn();
await transport.close();