Transport patterns

Internal transport components: the stream router on the client, the input-event lookup on the agent, and the cancel-routing pipeline. How the SDK coordinates between codec, channel, and application code.

The transport layer contains several internal components that coordinate between the codec, the Ably channel, and application code. This page describes the key patterns and what each component owns.

Inbound data flow on the client:

Ably channel │ ▼ Channel subscription (ai-input, ai-output, ai-run-start, ai-run-suspend, ai-run-resume, ai-run-end, ai-cancel) │ ▼ Decoder ── DecodedMessage<TInput, TOutput> │ ▼ Tree ── per-node Codec.fold (TProjection on InputNode or RunNode) │ ▼ Tree emits: 'update' structural change (node insert / sort / lifecycle) 'run' Run lifecycle event (start / suspend / resume / end) 'output' decoded TOutput events with routing metadata (runId, inputCodecMessageId, codecMessageId, serial) │ ▼ View ── walks the selected branch, emits 'update' on visible changes ActiveRun ── runId Promise resolves on the matching ai-run-start
Copied!

Outbound data flow on the agent:

Application code (streamText, your model handler) │ ▼ Run.pipe(stream) ── reads TOutput from ReadableStream │ ▼ Encoder.publishOutput ── stream or discrete via codec rules │ ▼ ChannelWriter ── channel.publish / appendMessage / updateMessage │ ▼ Ably channel (ai-output) Run.abortSignal ◄── cancel routing fires on matching ai-cancel
Copied!

Stream router

The stream router lives on the client. It routes inbound channel events to the correct per-Run state, turning a flat channel subscription into per-Run projections that the View exposes as visible messages.

The router keys on the triggering input's codec-message-id, the synchronous handle the client owns from send time (the agent has not minted the runId yet). When the client calls a write method on the View (send, regenerate, edit), the View mints the input's codec-message-id, publishes the input on the channel, and registers an optimistic input node in the Tree. The returned ActiveRun carries the synchronous inputCodecMessageId, the inputEventId, and a runId: Promise<string> that resolves when the agent's ai-run-start for this send lands.

When inbound ai-output messages decode to TOutput events, the router folds them into the matching Run's per-Run projection by reading the output's input-codec-message-id header (the agent echoes the triggering input's id on every event it publishes). When ai-run-end arrives for the Run, the router marks the Run terminal. The View's update event fires; consumers re-read getMessages() / runs().

If the channel loses continuity (ChannelContinuityLost), the router surfaces the error via the session's on('error', ...) and active Runs are marked accordingly.

Input-event lookup

The input-event lookup lives on the agent. When Run.start() is called, the agent attaches the channel with rewind and waits for the triggering event-id carried in the invocation body to arrive on the channel.

The sequence:

1. Client publishes ai-input with event-id=E1 on the channel. Fresh send: no run-id on the input. Continuation: existing run-id stamped on the input from ActiveRun.runId. 2. Client POSTs InvocationData ({ inputEventId=E1, sessionName }) to the agent. 3. Agent route receives the POST. 4. Agent calls session.createRun(invocation, runtime). createRun mints the invocationId and registers the run for cancel routing immediately so early cancels fire. 5. Agent calls run.start(). The session attaches the channel with the configured rewindWindow and waits for E1 to arrive. 6. Either: - E1 is already on the channel (rewind) ──► resolves immediately. - E1 arrives live within inputEventLookupTimeoutMs ──► resolves. - inputEventLookupTimeoutMs lapses ──► rejects with InputEventNotFound. 7. The input event tells the agent which run-id to use: it mints a fresh run-id when the input event omits one, or reuses the run-id stamped on the input event for a continuation. 8. run.start() resolves, the agent fetches conversation history and pipes the LLM stream. The agent route returns { runId: run.runId, invocationId: run.invocationId } on the HTTP response.
Copied!

The buffer that holds pre-start events is sized by inputEventBufferLimit (default 200 input events). When the buffer is full, the oldest input event is FIFO-evicted; the client whose input was dropped fails its lookup with InputEventNotFound.

This is the mechanism that lets a serverless agent publish a Run reliably even when the channel publish and the HTTP POST race: the agent waits on the channel until the input event lands, then proceeds. Without it, an agent that attaches after the client published would never see the triggering event.

Cancel routing

Cancel routing connects client cancel signals to the correct agent-side Runs. The flow:

  1. The client calls session.cancel(runId) (when it has the resolved run id from ActiveRun.runId) or activeRun.cancel() (which keys on the triggering input's codec-message-id, the synchronous handle). The session publishes an ai-cancel message under extras.ai.transport, carrying whichever key the caller used.
  2. The agent session receives the cancel message through its channel subscription.
  3. The agent matches the cancel against its registered Runs. If the cancel arrives before run-start (the client cancelled before the agent minted the run id), the agent buffers it and fires once its input-event lookup resolves the input to the run.
  4. The matched Run's onCancel hook (if configured) is invoked. If the hook returns false, the cancel is rejected silently and the Run continues.
  5. If the cancel is accepted, the matched Run's AbortController fires. run.abortSignal flips to aborted; the LLM stream (if it was passed the same signal) stops; Run.pipe() resolves with reason: 'cancelled'; the encoder closes any in-flight streams with status: 'cancelled'.
  6. The agent code, in its finally block, calls run.end({ reason: 'cancelled' }). The agent publishes ai-run-end with run-reason: cancelled.

Cancel routing is per-Run. A cancel for one Run does not affect other active Runs on the same session; each Run has its own AbortController.

Pipe stream

Run.pipe(stream, options) connects a ReadableStream<TOutput> (from the LLM) to the encoder. It reads from the stream and writes each chunk through Encoder.publishOutput.

Responsibilities:

  • Reading: pulls chunks from the ReadableStream as they become available.
  • Encoding: passes each chunk to the encoder, which writes it to the channel as stream or discrete based on codec rules.
  • Abort handling: if run.abortSignal fires, pipe cancels the reader and calls encoder.cancel(). This stops the LLM stream and publishes a final updateMessage with status: 'cancelled'.
  • Completion: when the stream ends naturally, pipe calls encoder.close() to flush any buffered appends and finalise the last stream.

pipe returns a StreamResult with a reason field ('complete', 'cancelled', or 'error') and an optional error. The caller passes the reason to Run.end(reason). For runs that should pause awaiting input rather than end, use Run.suspend() instead of Run.end(...).

Channel continuity

The session monitors the Ably channel state. If the channel transitions to FAILED, SUSPENDED, or DETACHED, or re-attaches with resumed: false, the session emits a ChannelContinuityLost error through the session's on('error', ...) callback. Active streams on the client are errored with the same ErrorInfo; the agent does not see this directly, but its onError runtime hook fires for stream failures.

A continuity-lost session is still operational for new work after the channel re-attaches; the application decides whether to resume from history or to start a fresh session.

Build outbound headers

The transport layer composes the outbound headers under extras.ai.transport for every encoder publish. The encoder reads the per-Run state (runId, clientId, role defaults) and merges per-event options (codecMessageId, parent, forkOf, target). Stream-related headers (stream, stream-id, status) are stamped by the codec-core encoder under extras.ai.codec, not by the transport layer. The two tiers never mix; see Wire protocol for the tier contract.