The ServerTransport publishes to an Ably channel on behalf of the agent, manages the turn lifecycle, and routes cancel signals from clients to active turns. It is the server-side counterpart to ClientTransport.
Construct one with createServerTransport from the core entry point, or use the Vercel-pre-bound factory from @ably/ai-transport/vercel when the channel carries Vercel UIMessage content.
1
2
3
4
5
6
7
8
9
import { createServerTransport } from '@ably/ai-transport';
import { UIMessageCodec } from '@ably/ai-transport/vercel';
const transport = createServerTransport({
channel,
codec: UIMessageCodec,
});
const turn = transport.newTurn({ turnId, clientId });Create a server transport
function createServerTransport<TEvent, TMessage>(options: ServerTransportOptions<TEvent, TMessage>): ServerTransport<TEvent, TMessage>Construct a ServerTransport bound to an Ably channel. The transport attaches to the channel lazily on first use.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import Ably from 'ably';
import { createServerTransport } from '@ably/ai-transport';
import { UIMessageCodec } from '@ably/ai-transport/vercel';
const ably = new Ably.Realtime({ key: process.env.ABLY_API_KEY });
const channel = ably.channels.get('conversation-42');
const transport = createServerTransport({
channel,
codec: UIMessageCodec,
onError(error) {
console.error(`Transport error ${error.code}: ${error.message}`);
},
});Parameters
optionsrequiredServerTransportOptionsReturns
ServerTransport<TEvent, TMessage>
A new server transport. Lifecycle is managed by the caller; call close() to release resources.
Create a turn
newTurn(options: NewTurnOptions<TEvent>): Turn<TEvent, TMessage>Create a new turn. Synchronous; no channel activity until start() is called on the returned turn. The turn is registered for cancel routing immediately so that early cancels fire the abort signal.
1
2
3
4
5
6
7
8
9
10
const turn = transport.newTurn({
turnId,
clientId,
parent,
forkOf,
onCancel: async (request) => {
const owner = request.turnOwners.get(request.filter.turnId);
return owner === request.message.clientId;
},
});Parameters
optionsrequiredNewTurnOptionsonError fires in two scenarios. First, stream failures in streamResponse (the underlying error is also returned on StreamResult.error, but this callback delivers it wrapped as an ErrorInfo with code StreamError for standardised observability). Second, failures in the onCancel handler. Publish failures in start, addMessages, addEvents, and end reject the returned promise instead and are not delivered to onError. Channel-wide events (continuity loss) are delivered via the transport-level onError on ServerTransportOptions.
Returns
Turn<TEvent, TMessage>
A new turn. Call its lifecycle methods (start, addMessages, streamResponse, addEvents, end) to publish to the channel.
Close the transport
close(): voidUnsubscribe from cancel messages, abort all active turns, and clean up. Synchronous.
1
transport.close();Turn
A Turn is the server-side handle for a single turn's lifecycle. Create one with transport.newTurn() and call its methods in order: start → optionally addMessages → streamResponse (and/or addEvents) → end.
Properties
turnIdStringabortSignalAbortSignalturnId. Pass to the LLM call so generation stops on cancel.Start the turn
start(): Promise<void>Publish the turn-start event to the channel. Must be called before addMessages or streamResponse.
Returns
Promise<void>
Returns a promise. The promise is fulfilled when the turn-start event has been published, or rejected with an ErrorInfo object.
Add user messages
addMessages(messages: MessageNode<TMessage>[], options?: AddMessageOptions): Promise<AddMessagesResult>Publish user messages to the channel, scoped to this turn. Each node's msgId, parentId, and forkOf are used for message identity and branching. The node's headers override transport-generated defaults (used for optimistic reconciliation with the client's inserts).
Parameters
messagesrequiredMessageNodeoptionsoptionalAddMessageOptionsReturns
Promise<AddMessagesResult>
Returns a promise. The promise is fulfilled with an
x-ably-msg-id of each published message in order, or rejected with an ErrorInfo object.
Stream a response
streamResponse(stream: ReadableStream<TEvent>, options?: StreamResponseOptions<TEvent>): Promise<StreamResult>Pipe a ReadableStream through the encoder to the channel. Returns when the stream completes, is cancelled, or errors. Does not call end(); the caller must call end() after streamResponse returns.
1
2
3
4
5
6
7
8
const result = streamText({
model: anthropic('claude-sonnet-4-20250514'),
messages: history,
abortSignal: turn.abortSignal,
});
const { reason } = await turn.streamResponse(result.toUIMessageStream());
await turn.end(reason);Parameters
streamrequiredReadableStream<TEvent>optionsoptionalStreamResponseOptionsReturns
Promise<StreamResult>
Returns a promise. The promise is fulfilled with a
ErrorInfo object.
Add events to existing messages
addEvents(nodes: EventsNode<TEvent>[]): Promise<void>Publish events targeting existing messages in the tree. Each node specifies a target message (by msgId) and the events to apply. Events are encoded and published with the target's x-ably-msg-id, so receiving clients apply them to the existing node rather than creating a new one.
Used for cross-turn updates such as tool result delivery after approval or client-side tool execution.
Parameters
nodesrequiredEventsNodeReturns
Promise<void>
Returns a promise. The promise is fulfilled when every targeted message has received its events, or rejected with an ErrorInfo object.
End the turn
end(reason: TurnEndReason): Promise<void>Publish the turn-end event to the channel and clean up. reason is one of 'complete', 'cancelled', or 'error'.
1
await turn.end('complete');Parameters
reasonrequired'complete' or 'cancelled' or 'error'Returns
Promise<void>
Returns a promise. The promise is fulfilled when the turn-end event has been published, or rejected with an ErrorInfo object.
Example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import { after } from 'next/server';
import { streamText, convertToModelMessages } from 'ai';
import { anthropic } from '@ai-sdk/anthropic';
import Ably from 'ably';
import { createServerTransport } from '@ably/ai-transport/vercel';
const ably = new Ably.Realtime({ key: process.env.ABLY_API_KEY });
export async function POST(req) {
const { messages, id, turnId, clientId, parent, forkOf } = await req.json();
const channel = ably.channels.get(id);
const transport = createServerTransport({
channel,
onError(error) {
console.error('Transport error:', error);
},
});
const turn = transport.newTurn({
turnId,
clientId,
parent,
forkOf,
signal: req.signal,
onCancel: async (request) => {
const owner = request.turnOwners.get(request.filter.turnId);
return owner === request.message.clientId;
},
});
await turn.start();
if (messages.length > 0) {
await turn.addMessages(messages.map((m) => ({ kind: 'message', ...m })), { clientId });
}
const result = streamText({
model: anthropic('claude-sonnet-4-20250514'),
messages: await convertToModelMessages(messages),
abortSignal: turn.abortSignal,
});
after(async () => {
try {
const { reason } = await turn.streamResponse(result.toUIMessageStream());
await turn.end(reason);
} catch {
await turn.end('error');
} finally {
transport.close();
}
});
return new Response(null, { status: 200 });
}