# ServerTransport The `ServerTransport` publishes to an Ably channel on behalf of the agent, manages the turn lifecycle, and routes cancel signals from clients to active turns. It is the server-side counterpart to [`ClientTransport`](https://ably.com/docs/ai-transport/api/javascript/client-transport.md). Construct one with `createServerTransport` from the core entry point, or use the Vercel-pre-bound factory from `@ably/ai-transport/vercel` when the channel carries Vercel `UIMessage` content. #### Javascript ``` import { createServerTransport } from '@ably/ai-transport'; import { UIMessageCodec } from '@ably/ai-transport/vercel'; const transport = createServerTransport({ channel, codec: UIMessageCodec, }); const turn = transport.newTurn({ turnId, clientId }); ``` ## Create a server transport `function createServerTransport(options: ServerTransportOptions): ServerTransport` Construct a `ServerTransport` bound to an Ably channel. The transport attaches to the channel lazily on first use. ### Javascript ``` import Ably from 'ably'; import { createServerTransport } from '@ably/ai-transport'; import { UIMessageCodec } from '@ably/ai-transport/vercel'; const ably = new Ably.Realtime({ key: process.env.ABLY_API_KEY }); const channel = ably.channels.get('conversation-42'); const transport = createServerTransport({ channel, codec: UIMessageCodec, onError(error) { console.error(`Transport error ${error.code}: ${error.message}`); }, }); ``` ### Parameters | Parameter | Required | Description | Type | | --- | --- | --- | --- | | options | Required | Configuration for the server transport. |
|
| Property | Description | Type | | --- | --- | --- | | channel | The Ably channel to publish to. Must match the client's channel. | `Ably.RealtimeChannel` | | codec | The codec to use for encoding events and messages. See [Codec](https://ably.com/docs/ai-transport/api/javascript/codec.md). | `Codec` | | logger | Logger instance for diagnostic output. | `Logger` or Undefined | | onError | Called with non-fatal transport-level errors not scoped to any turn. Examples: cancel listener subscription failure, channel attach errors, channel continuity loss. | `(error: Ably.ErrorInfo) => void` or Undefined |
### Returns `ServerTransport` A new server transport. Lifecycle is managed by the caller; call `close()` to release resources. ## Create a turn `newTurn(options: NewTurnOptions): Turn` Create a new turn. Synchronous; no channel activity until `start()` is called on the returned turn. The turn is registered for cancel routing immediately so that early cancels fire the abort signal. ### Javascript ``` const turn = transport.newTurn({ turnId, clientId, parent, forkOf, onCancel: async (request) => { const owner = request.turnOwners.get(request.filter.turnId); return owner === request.message.clientId; }, }); ``` ### Parameters | Parameter | Required | Description | Type | | --- | --- | --- | --- | | options | Required | Turn configuration. |
|
| Property | Description | Type | | --- | --- | --- | | turnId | The turn identifier (generated by the client transport or the server). | String | | clientId | The user's `clientId` for attribution. | String or Undefined | | parent | The msg-id of the immediately preceding message in this branch. Used as the default parent for `addMessages` and `streamResponse` when not overridden per-operation. | String or Undefined | | forkOf | The msg-id of the message this turn replaces. Stamped on user messages (for edits) or assistant messages (for regeneration). | String or Undefined | | onMessage | Called before each Ably message is published in this turn. Mutate the Ably message in place to add custom `extras.headers`. | `(message: Ably.Message) => void` or Undefined | | onAbort | Called when the turn's stream is aborted (by cancel or server). Receives a `write` function to publish final events before the abort finalises. | `(write: (event: TEvent) => Promise) => void \| Promise` or Undefined | | onCancel | Called when a cancel message arrives matching this turn. Return `true` to allow cancellation (fires the abort signal, stream aborts); return `false` to reject (cancel ignored, stream continues). If not provided, all cancels are accepted. The handler receives a
. | `(request: CancelRequest) => Promise` or Undefined | | onError | Called with non-fatal turn-scoped errors that have no other delivery path. See description below. | `(error: Ably.ErrorInfo) => void` or Undefined | | signal | External abort signal (typically the HTTP request's `req.signal`) that, when fired, aborts this turn. Use to wire platform-level cancellation (request cancellation, serverless function timeout). | `AbortSignal` or Undefined |
| Property | Description | Type | | --- | --- | --- | | message | The raw Ably message that carried the cancel signal. | `Ably.InboundMessage` | | filter | The parsed cancel scope from the message headers. |
| | matchedTurnIds | Which active turnIds would be cancelled if allowed. | `string[]` | | turnOwners | Map of `turnId` to the owner `clientId` for the matched turns. | `Map` |
| Property | Description | Type | | --- | --- | --- | | turnId | A specific turn ID to cancel. | String or Undefined | | own | Cancel every turn belonging to the sender's `clientId`. | Boolean or Undefined | | clientId | Cancel every turn belonging to a specific `clientId`. | String or Undefined | | all | Cancel every turn on the channel. | Boolean or Undefined |
`onError` fires in two scenarios. First, stream failures in `streamResponse` (the underlying error is also returned on `StreamResult.error`, but this callback delivers it wrapped as an [`ErrorInfo`](https://ably.com/docs/ai-transport/api/errors.md#errorinfo) with code `StreamError` for standardised observability). Second, failures in the `onCancel` handler. Publish failures in `start`, `addMessages`, `addEvents`, and `end` reject the returned promise instead and are not delivered to `onError`. Channel-wide events (continuity loss) are delivered via the transport-level `onError` on `ServerTransportOptions`. ### Returns `Turn` A new turn. Call its lifecycle methods (`start`, `addMessages`, `streamResponse`, `addEvents`, `end`) to publish to the channel. ## Close the transport `close(): void` Unsubscribe from cancel messages, abort all active turns, and clean up. Synchronous. ### Javascript ``` transport.close(); ``` ## Turn A `Turn` is the server-side handle for a single turn's lifecycle. Create one with `transport.newTurn()` and call its methods in order: `start` → optionally `addMessages` → `streamResponse` (and/or `addEvents`) → `end`. ### Properties | Property | Description | Type | | --- | --- | --- | | turnId | The turn's unique identifier. | String | | abortSignal | Abort signal scoped to this turn. Fires when a cancel event arrives for this `turnId`. Pass to the LLM call so generation stops on cancel. | `AbortSignal` |
### Start the turn `start(): Promise` Publish the turn-start event to the channel. Must be called before `addMessages` or `streamResponse`. #### Returns `Promise` Returns a promise. The promise is fulfilled when the turn-start event has been published, or rejected with an [`ErrorInfo`](https://ably.com/docs/ai-transport/api/errors.md#errorinfo) object. ### Add user messages `addMessages(messages: MessageNode[], options?: AddMessageOptions): Promise` Publish user messages to the channel, scoped to this turn. Each node's `msgId`, `parentId`, and `forkOf` are used for message identity and branching. The node's `headers` override transport-generated defaults (used for optimistic reconciliation with the client's inserts). #### Parameters | Parameter | Required | Description | Type | | --- | --- | --- | --- | | messages | Required | User messages to publish. Each node carries identity, headers, and the domain message. |
| | options | Optional | Per-operation overrides for attribution. |
|
| Property | Description | Type | | --- | --- | --- | | clientId | The user's `clientId` for attribution. Overrides the turn's `clientId`. | String or Undefined |
| Property | Description | Type | | --- | --- | --- | | kind | `'message'`. Distinguishes a message node from an events node. | String | | msgId | The Ably message ID to publish under. Primary key in the tree. | String | | message | The domain message to encode. | `TMessage` | | parentId | The parent node's `msgId`, or `undefined` for root messages. | String or Undefined | | forkOf | The `msgId` this node forks from (regeneration or edit), or `undefined` if it is the first version. | String or Undefined | | headers | Transport headers overriding transport-generated defaults. | `Record` | | serial | The Ably serial for tree ordering. Set on tree-side; not used by the encoder. | String or Undefined |
#### Returns `Promise` Returns a promise. The promise is fulfilled with an containing the `x-ably-msg-id` of each published message in order, or rejected with an [`ErrorInfo`](https://ably.com/docs/ai-transport/api/errors.md#errorinfo) object.
| Property | Description | Type | | --- | --- | --- | | msgIds | The `x-ably-msg-id` of each published message, in order. | `string[]` |
### Stream a response `streamResponse(stream: ReadableStream, options?: StreamResponseOptions): Promise` Pipe a `ReadableStream` through the encoder to the channel. Returns when the stream completes, is cancelled, or errors. Does not call `end()`; the caller must call `end()` after `streamResponse` returns. #### Javascript ``` const result = streamText({ model: anthropic('claude-sonnet-4-20250514'), messages: history, abortSignal: turn.abortSignal, }); const { reason } = await turn.streamResponse(result.toUIMessageStream()); await turn.end(reason); ``` #### Parameters | Parameter | Required | Description | Type | | --- | --- | --- | --- | | stream | Required | The event stream to encode and publish. | `ReadableStream` | | options | Optional | Per-operation overrides for the assistant message. |
|
| Property | Description | Type | | --- | --- | --- | | parent | The msg-id of the immediately preceding message in this branch. Overrides the turn's `parent`. | String or Undefined | | forkOf | The msg-id of the message this response replaces (for regeneration). | String or Undefined | | resolveWriteOptions | Per-event hook invoked before each event is encoded. The returned `WriteOptions` (if any) override the stream's default headers and `msgId` for that one encode call only; return `undefined` to use the stream defaults. Used to carry a subset of events within the stream to a different message (for example, `tool-output-available` chunks that belong on a prior assistant message). Must not be used for events that participate in the encoder's stream-append pipeline. | `(event: TEvent) => WriteOptions \| undefined` or Undefined |
#### Returns `Promise` Returns a promise. The promise is fulfilled with a describing why the stream ended, or rejected with an [`ErrorInfo`](https://ably.com/docs/ai-transport/api/errors.md#errorinfo) object.
| Property | Description | Type | | --- | --- | --- | | reason | Why the stream ended. One of `'complete'`, `'cancelled'`, `'error'`. | `TurnEndReason` | | error | The error that caused the stream to fail, present when `reason` is `'error'`. The original error from the LLM provider, preserved so the caller can inspect provider-specific fields. | `Error` or Undefined |
### Add events to existing messages `addEvents(nodes: EventsNode[]): Promise` Publish events targeting existing messages in the tree. Each node specifies a target message (by `msgId`) and the events to apply. Events are encoded and published with the target's `x-ably-msg-id`, so receiving clients apply them to the existing node rather than creating a new one. Used for cross-turn updates such as tool result delivery after approval or client-side tool execution. #### Parameters | Parameter | Required | Description | Type | | --- | --- | --- | --- | | nodes | Required | Events nodes targeting existing messages. |
|
| Property | Description | Type | | --- | --- | --- | | kind | `'event'`. Distinguishes an events node from a message node. | String | | msgId | The `x-ably-msg-id` of the existing message to update. | String | | events | Events to apply to the target message. | `TEvent[]` |
#### Returns `Promise` Returns a promise. The promise is fulfilled when every targeted message has received its events, or rejected with an [`ErrorInfo`](https://ably.com/docs/ai-transport/api/errors.md#errorinfo) object. ### End the turn `end(reason: TurnEndReason): Promise` Publish the turn-end event to the channel and clean up. `reason` is one of `'complete'`, `'cancelled'`, or `'error'`. #### Javascript ``` await turn.end('complete'); ``` #### Parameters | Parameter | Required | Description | Type | | --- | --- | --- | --- | | reason | Required | Why the turn ended. | `'complete'` or `'cancelled'` or `'error'` |
#### Returns `Promise` Returns a promise. The promise is fulfilled when the turn-end event has been published, or rejected with an [`ErrorInfo`](https://ably.com/docs/ai-transport/api/errors.md#errorinfo) object. ## Example ### Javascript ``` import { after } from 'next/server'; import { streamText, convertToModelMessages } from 'ai'; import { anthropic } from '@ai-sdk/anthropic'; import Ably from 'ably'; import { createServerTransport } from '@ably/ai-transport/vercel'; const ably = new Ably.Realtime({ key: process.env.ABLY_API_KEY }); export async function POST(req) { const { messages, id, turnId, clientId, parent, forkOf } = await req.json(); const channel = ably.channels.get(id); const transport = createServerTransport({ channel, onError(error) { console.error('Transport error:', error); }, }); const turn = transport.newTurn({ turnId, clientId, parent, forkOf, signal: req.signal, onCancel: async (request) => { const owner = request.turnOwners.get(request.filter.turnId); return owner === request.message.clientId; }, }); await turn.start(); if (messages.length > 0) { await turn.addMessages(messages.map((m) => ({ kind: 'message', ...m })), { clientId }); } const result = streamText({ model: anthropic('claude-sonnet-4-20250514'), messages: await convertToModelMessages(messages), abortSignal: turn.abortSignal, }); after(async () => { try { const { reason } = await turn.streamResponse(result.toUIMessageStream()); await turn.end(reason); } catch { await turn.end('error'); } finally { transport.close(); } }); return new Response(null, { status: 200 }); } ``` ## Related Topics - [Client transport](https://ably.com/docs/ai-transport/api/javascript/client-transport.md): API reference for the AI Transport client transport: options, properties, methods, the View interface, and the ActiveTurn type. - [Vercel integration](https://ably.com/docs/ai-transport/api/javascript/vercel.md): API reference for the AI Transport Vercel AI SDK integration. UIMessageCodec, ChatTransport, and pre-bound transport factories. - [Codec](https://ably.com/docs/ai-transport/api/javascript/codec.md): API reference for the AI Transport codec interface. Build custom codecs to integrate any AI framework with Ably channels. ## Documentation Index To discover additional Ably documentation: 1. Fetch [llms.txt](https://ably.com/llms.txt) for the canonical list of available pages. 2. Identify relevant URLs from that index. 3. Fetch target pages as needed. Avoid using assumed or outdated documentation paths.