The transport layer contains several internal components that coordinate between the codec, the Ably channel, and your application code. This page describes the key patterns and what each component is responsible for.
StreamRouter
The StreamRouter lives on the client side. It routes inbound Ably channel events to the correct stream, turning a flat subscription into per-turn ReadableStream instances.
When the client transport subscribes to a channel, all messages arrive through a single callback. The StreamRouter dispatches each message to the appropriate turn based on the x-ably-turn-id header:
- When the client calls
send()(during_internalSend), the router creates a newReadableStreamand its controller for that turn. - Content messages are enqueued into the matching stream's controller.
- When a terminal event arrives (
turn-end,abort, orerror), the router closes the stream's controller.
This gives each turn an independent stream that can be consumed, cancelled, or abandoned without affecting other turns.
TurnManager
The TurnManager lives on the server side. It tracks active turns and manages their lifecycle events.
When the server creates a new turn with transport.newTurn(), the TurnManager:
- Registers the turn with its ID, client ID, and optional fork metadata.
- Publishes a
turn-startlifecycle event to the channel. - Creates an
AbortControllerwired to cancel signals for this turn. - Returns a turn handle with
streamResponse(),addMessages(),end(), andabortSignal.
When the turn ends, the TurnManager publishes a turn-end event with the appropriate reason and cleans up internal state. If the turn is cancelled, the abort controller fires before cleanup.
pipeStream
The pipeStream function connects a ReadableStream (from the LLM) to the encoder. It reads from the stream and writes each chunk through the encoder's appendEvent method.
The key responsibilities are:
- Reading: pulls chunks from the
ReadableStreamas they become available. - Encoding: passes each chunk to the encoder, which publishes it to the channel.
- Abort handling: if the turn's abort signal fires,
pipeStreamcancels the reader and callsencoder.abort(). This stops the LLM stream and publishes an abort marker to the channel. - Completion: when the stream ends naturally,
pipeStreamcallsencoder.close()to finalize the message.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Simplified pipeStream flow
async function pipeStream(stream, encoder, abortSignal) {
const reader = stream.getReader()
abortSignal.addEventListener('abort', () => {
reader.cancel()
})
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
await encoder.appendEvent(value)
}
await encoder.close()
return 'complete'
} catch (error) {
if (abortSignal.aborted) {
await encoder.abort()
return 'cancelled'
}
throw error
}
}The actual implementation includes retry logic for transient errors and coordinates with the encoder's flush-and-recovery mechanism.
Cancel routing
Cancel routing connects client cancel signals to the correct server-side turns. The flow is:
- The client publishes a
cancelmessage to the channel with a filter (for example,{ own: true }or{ turnId: 'abc' }). - The server transport receives the cancel message through its channel subscription.
- The transport evaluates the filter against all registered turns. For
{ own: true }, it matches turns where the owner client ID matches the cancel message's client ID. - Each matched turn's
onCancelhook is called if one is registered. If the hook returnsfalse, that turn is excluded. - For authorized turns, the transport fires their
AbortController, which triggerspipeStreamto abort.
Handler isolation ensures that a cancel for one turn does not affect other active turns. Each turn has its own AbortController, so aborting one turn leaves others running.
buildTransportHeaders
The buildTransportHeaders function constructs the x-ably-* headers for outbound messages. It is called by the encoder before every publish operation.
The function takes the current turn state (turn ID, client ID, role, fork metadata) and produces a headers object. The codec can add domain headers, and the encoder merges them using the priority rules described in codec architecture.
1
2
3
4
5
6
7
8
9
10
11
12
// Simplified buildTransportHeaders
function buildTransportHeaders(turn, messageOptions) {
return {
'x-ably-role': messageOptions.role,
'x-ably-turn-id': turn.turnId,
'x-ably-msg-id': messageOptions.msgId,
...(turn.clientId && { 'x-ably-turn-client-id': turn.clientId }),
...(messageOptions.parent && { 'x-ably-parent': messageOptions.parent }),
...(messageOptions.forkOf && { 'x-ably-fork-of': messageOptions.forkOf }),
...(messageOptions.amend && { 'x-ably-amend': messageOptions.amend }),
}
}Related pages
- Wire protocol - the headers and message format these components produce and consume.
- Codec architecture - the encoder and decoder that
pipeStreamandStreamRouterdrive. - Cancellation - the user-facing cancel feature.
- Token streaming - the feature that
pipeStreamenables.