ServerTransport

Open in

The ServerTransport publishes to an Ably channel on behalf of the agent, manages the turn lifecycle, and routes cancel signals from clients to active turns. It is the server-side counterpart to ClientTransport.

Construct one with createServerTransport from the core entry point, or use the Vercel-pre-bound factory from @ably/ai-transport/vercel when the channel carries Vercel UIMessage content.

JavaScript

1

2

3

4

5

6

7

8

9

import { createServerTransport } from '@ably/ai-transport';
import { UIMessageCodec } from '@ably/ai-transport/vercel';

const transport = createServerTransport({
  channel,
  codec: UIMessageCodec,
});

const turn = transport.newTurn({ turnId, clientId });

Create a server transport

function createServerTransport<TEvent, TMessage>(options: ServerTransportOptions<TEvent, TMessage>): ServerTransport<TEvent, TMessage>

Construct a ServerTransport bound to an Ably channel. The transport attaches to the channel lazily on first use.

JavaScript

1

2

3

4

5

6

7

8

9

10

11

12

13

14

import Ably from 'ably';
import { createServerTransport } from '@ably/ai-transport';
import { UIMessageCodec } from '@ably/ai-transport/vercel';

const ably = new Ably.Realtime({ key: process.env.ABLY_API_KEY });
const channel = ably.channels.get('conversation-42');

const transport = createServerTransport({
  channel,
  codec: UIMessageCodec,
  onError(error) {
    console.error(`Transport error ${error.code}: ${error.message}`);
  },
});

Parameters

optionsrequiredServerTransportOptions
Configuration for the server transport.

Returns

ServerTransport<TEvent, TMessage>

A new server transport. Lifecycle is managed by the caller; call close() to release resources.

Create a turn

newTurn(options: NewTurnOptions<TEvent>): Turn<TEvent, TMessage>

Create a new turn. Synchronous; no channel activity until start() is called on the returned turn. The turn is registered for cancel routing immediately so that early cancels fire the abort signal.

JavaScript

1

2

3

4

5

6

7

8

9

10

const turn = transport.newTurn({
  turnId,
  clientId,
  parent,
  forkOf,
  onCancel: async (request) => {
    const owner = request.turnOwners.get(request.filter.turnId);
    return owner === request.message.clientId;
  },
});

Parameters

optionsrequiredNewTurnOptions
Turn configuration.

onError fires in two scenarios. First, stream failures in streamResponse (the underlying error is also returned on StreamResult.error, but this callback delivers it wrapped as an ErrorInfo with code StreamError for standardised observability). Second, failures in the onCancel handler. Publish failures in start, addMessages, addEvents, and end reject the returned promise instead and are not delivered to onError. Channel-wide events (continuity loss) are delivered via the transport-level onError on ServerTransportOptions.

Returns

Turn<TEvent, TMessage>

A new turn. Call its lifecycle methods (start, addMessages, streamResponse, addEvents, end) to publish to the channel.

Close the transport

close(): void

Unsubscribe from cancel messages, abort all active turns, and clean up. Synchronous.

JavaScript

1

transport.close();

Turn

A Turn is the server-side handle for a single turn's lifecycle. Create one with transport.newTurn() and call its methods in order: start → optionally addMessages → streamResponse (and/or addEvents) → end.

Properties

turnIdString
The turn's unique identifier.
abortSignalAbortSignal
Abort signal scoped to this turn. Fires when a cancel event arrives for this turnId. Pass to the LLM call so generation stops on cancel.

Start the turn

start(): Promise<void>

Publish the turn-start event to the channel. Must be called before addMessages or streamResponse.

Returns

Promise<void>

Returns a promise. The promise is fulfilled when the turn-start event has been published, or rejected with an ErrorInfo object.

Add user messages

addMessages(messages: MessageNode<TMessage>[], options?: AddMessageOptions): Promise<AddMessagesResult>

Publish user messages to the channel, scoped to this turn. Each node's msgId, parentId, and forkOf are used for message identity and branching. The node's headers override transport-generated defaults (used for optimistic reconciliation with the client's inserts).

Parameters

messagesrequiredMessageNode
User messages to publish. Each node carries identity, headers, and the domain message.
optionsoptionalAddMessageOptions
Per-operation overrides for attribution.

Returns

Promise<AddMessagesResult>

Returns a promise. The promise is fulfilled with an

containing the x-ably-msg-id of each published message in order, or rejected with an ErrorInfo object.

Stream a response

streamResponse(stream: ReadableStream<TEvent>, options?: StreamResponseOptions<TEvent>): Promise<StreamResult>

Pipe a ReadableStream through the encoder to the channel. Returns when the stream completes, is cancelled, or errors. Does not call end(); the caller must call end() after streamResponse returns.

JavaScript

1

2

3

4

5

6

7

8

const result = streamText({
  model: anthropic('claude-sonnet-4-20250514'),
  messages: history,
  abortSignal: turn.abortSignal,
});

const { reason } = await turn.streamResponse(result.toUIMessageStream());
await turn.end(reason);

Parameters

streamrequiredReadableStream<TEvent>
The event stream to encode and publish.
optionsoptionalStreamResponseOptions
Per-operation overrides for the assistant message.

Returns

Promise<StreamResult>

Returns a promise. The promise is fulfilled with a

describing why the stream ended, or rejected with an ErrorInfo object.

Add events to existing messages

addEvents(nodes: EventsNode<TEvent>[]): Promise<void>

Publish events targeting existing messages in the tree. Each node specifies a target message (by msgId) and the events to apply. Events are encoded and published with the target's x-ably-msg-id, so receiving clients apply them to the existing node rather than creating a new one.

Used for cross-turn updates such as tool result delivery after approval or client-side tool execution.

Parameters

nodesrequiredEventsNode
Events nodes targeting existing messages.

Returns

Promise<void>

Returns a promise. The promise is fulfilled when every targeted message has received its events, or rejected with an ErrorInfo object.

End the turn

end(reason: TurnEndReason): Promise<void>

Publish the turn-end event to the channel and clean up. reason is one of 'complete', 'cancelled', or 'error'.

JavaScript

1

await turn.end('complete');

Parameters

reasonrequired'complete' or 'cancelled' or 'error'
Why the turn ended.

Returns

Promise<void>

Returns a promise. The promise is fulfilled when the turn-end event has been published, or rejected with an ErrorInfo object.

Example

JavaScript

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

import { after } from 'next/server';
import { streamText, convertToModelMessages } from 'ai';
import { anthropic } from '@ai-sdk/anthropic';
import Ably from 'ably';
import { createServerTransport } from '@ably/ai-transport/vercel';

const ably = new Ably.Realtime({ key: process.env.ABLY_API_KEY });

export async function POST(req) {
  const { messages, id, turnId, clientId, parent, forkOf } = await req.json();
  const channel = ably.channels.get(id);

  const transport = createServerTransport({
    channel,
    onError(error) {
      console.error('Transport error:', error);
    },
  });

  const turn = transport.newTurn({
    turnId,
    clientId,
    parent,
    forkOf,
    signal: req.signal,
    onCancel: async (request) => {
      const owner = request.turnOwners.get(request.filter.turnId);
      return owner === request.message.clientId;
    },
  });

  await turn.start();

  if (messages.length > 0) {
    await turn.addMessages(messages.map((m) => ({ kind: 'message', ...m })), { clientId });
  }

  const result = streamText({
    model: anthropic('claude-sonnet-4-20250514'),
    messages: await convertToModelMessages(messages),
    abortSignal: turn.abortSignal,
  });

  after(async () => {
    try {
      const { reason } = await turn.streamResponse(result.toUIMessageStream());
      await turn.end(reason);
    } catch {
      await turn.end('error');
    } finally {
      transport.close();
    }
  });

  return new Response(null, { status: 200 });
}