Codec architecture
Internal architecture of the codec: the input/output type split, encoder and decoder pipelines, and how the codec composes with the transport layer.
The codec translates between your AI framework's event types and Ably channel messages. The interface is generic over four type parameters: TInput (client-published events), TOutput (agent-published events), TProjection (the per-Run reduced state), and TMessage (the rendered domain message). The two-direction split is type-system-enforced: an encoder can publish a TInput (publishInput) or a TOutput (publishOutput), and a decoder returns { inputs, outputs } so the consumer can branch on direction without re-checking each event.
The two pipelines mirror each other:
Publish side (agent): TOutput events from streamText / your model │ ▼ Encoder.publishOutput │ ▼ ChannelWriter ── channel.publish / appendMessage / updateMessage │ ▼ Ably channel (ai-output) Publish side (client): TInput events from view.send / regenerate / edit │ ▼ Encoder.publishInput │ ▼ ChannelWriter ── channel.publish (discrete) │ ▼ Ably channel (ai-input) Subscribe side (both): Ably channel │ ▼ Decoder.decode ── DecodedMessage<TInput, TOutput> │ ▼ Tree ── Codec.fold per node (TProjection on InputNode or RunNode) │ ▼ Codec.getMessages(projection) ── CodecMessage<TMessage>[] │ ▼ ViewCopyCopied!
Two-layer split
AI Transport separates concerns into two layers:
- Transport layer (generic): manages Run identity, lifecycle events, cancellation, input-event lookup, history pagination, and multi-client sync. It works with any
TInputandTOutput. - Codec layer (domain): maps framework-specific events to Ably publish operations and back. It knows the shape of your events; it does not manage Run lifecycle.
The transport layer calls into the codec but never inspects the domain payload. The codec calls into the channel writer but never manages Runs or lifecycle. This separation is what makes AI Transport framework-agnostic; the Vercel UIMessageCodec is one implementation, but any framework's event shape can be implemented against the same Codec interface.
TInput and TOutput
The codec is bidirectional with separate type bases:
CodecInputEventis the base for everything a client publishes. The Vercel codec definesUserMessage,Regenerate,ToolResult,ToolResultError, andToolApprovalResponseas well-known input variants on top of this base.CodecOutputEventis the base for everything an agent publishes: text deltas, tool calls, reasoning, file or source parts, data chunks.
Both bases carry a kind discriminator the codec reads to dispatch. Inputs additionally carry routing fields (codecMessageId, parent, target) that the encoder stamps onto the wire as transport headers.
Decoder.decode(message) returns a DecodedMessage<TInput, TOutput> tagged with which direction the message rode on. Consumers branch on the tag rather than re-classifying.
Encoder
The encoder converts outbound events into Ably publish operations. It exposes four methods on the Encoder interface:
| Method | Purpose |
|---|---|
publishInput(input, options?) | Publish a single TInput event on ai-input. Stamps the codec-message-id and merges per-write headers under extras.ai.transport. |
publishOutput(output, options?) | Publish a single TOutput event on ai-output. Streams when the codec marks the event as appendable; otherwise publishes discretely. |
cancel(reason?) | Close any in-flight streams with status: 'cancelled'. |
close() | Flush pending appends, close active streams, release resources. |
Streamed mode
For events the codec marks as appendable (text deltas, reasoning deltas), the encoder runs a three-step pipeline per stream, exposed through the encoder core as startStream, appendStream, and closeStream:
startStream(streamId, payload)callschannel.publishwithstream: 'true',status: 'streaming', and the suppliedstream-id. The publish returns a serial; the encoder core retains it for subsequent appends.- Each subsequent appendable delta calls
appendStream(streamId, data), which fire-and-forgetschannel.appendMessagewith the captured serial. - The terminal event calls
closeStream(streamId, payload), which writes a finalchannel.appendMessagewithstatus: 'complete'(orcancelStream(streamId)writesstatus: 'cancelled').
Per-token append calls do not block the encoder. The encoder collects every append promise and awaits them as a batch when the stream closes. If any append rejected, the encoder writes a recovery channel.updateMessage containing the full accumulated payload, so subscribers see the intended final state even when intermediate appends were lost.
Discrete mode
Client publishes (publishInput) and lifecycle events use discrete mode: one channel.publish per event, no appends. Discrete events still carry the codec payload and any per-event headers. Codecs that need to publish several discrete messages atomically use publishDiscreteBatch from the encoder core to send them in a single channel publish.
Decoder
The decoder converts inbound Ably messages back into TInput or TOutput events. It dispatches on the inbound message's Ably action.
| Action | Meaning | Decoder behaviour |
|---|---|---|
message.create | A new message arrived | Begin a new stream (when stream: 'true') or hand the payload to the codec's discrete decode hook. |
message.append | A token was appended | Emit a delta for the matching stream-id; emit end events when the append carries status: 'complete'. |
message.update | A message was replaced | Prefix-match against the tracked stream. If the new data extends the accumulated text, emit the delta; otherwise treat as first-contact and accumulate. |
message.delete | A message was removed | Clear the stream's tracker state. |
The decoder maintains a stream tracker that maps Ably channel serial to its current state. A subscriber that joins mid-stream sees a message.update or message.append for a serial it has never seen the message.create for; the decoder treats the incoming state as the current full state and accumulates from there. The end state is the same as a subscriber that received every operation.
decode() returns a DecodedMessage<TInput, TOutput>. The Tree feeds each decoded event through Codec.fold(projection, event) to update the owning Run's TProjection. Codec.getMessages(projection) materialises the projection into CodecMessage<TMessage>[]: each entry pairs the domain message with the SDK's client-minted codecMessageId. The View concatenates these across the visible Run chain and exposes the pair list as getMessages(). There is no second naked-message accessor; callers that just need the domain objects map .message.
Header tier story
Codec headers (stream, stream-id, status, discrete) live under extras.ai.codec. Transport headers (run-id, codec-message-id, role, parent, fork-of, and others) live under extras.ai.transport. The codec encoder reads and writes the codec tier; the transport layer reads and writes the transport tier. Neither layer reaches across the boundary, which keeps a custom codec from accidentally writing a run-id or a transport router from accidentally truncating a stream.
mergeHeaders, getTransportHeaders, and getCodecHeaders from @ably/ai-transport are utility helpers for working with the tier structure.
Write a custom codec
To support a framework not covered by the SDK:
- Define your
TInputandTOutputevent shapes as subtypes ofCodecInputEventandCodecOutputEvent. - Implement the
Codec<TInput, TOutput, TProjection, TMessage>interface. The required methods areinit(initial per-Run projection),fold(reduce one event into the projection),createEncoder,createDecoder,getMessages(projection to messages),createUserMessage(wrap aTMessageas aUserMessageinput), andcreateRegenerate(build aRegenerateinput). The tool-resolution factories (createToolResult,createToolResultError,createToolApprovalResponse) are optional; implement them only if yourTInputunion includes those variants. - Use
createEncoderCoreandcreateDecoderCorefrom@ably/ai-transportfor the base publish and dispatch machinery. The encoder core exposespublishDiscrete,publishDiscreteBatch,startStream,appendStream,closeStream,cancelStream, andcancelAllStreams. The decoder core takes aDecoderCoreHooks<TEvent>object withbuildStartEvents,buildDeltaEvents,buildEndEvents, anddecodeDiscretehooks.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import { createEncoderCore, createDecoderCore } from '@ably/ai-transport';
const myCodec = {
init() {
return { messages: [], openStreams: new Map() };
},
fold(projection, event, meta) {
// reduce one event into the per-Run projection, using meta.serial for idempotency
return projection;
},
createEncoder(channel, options) {
const core = createEncoderCore(channel, options);
return {
publishInput: async (input, opts) =>
core.publishDiscrete(
{ name: 'ai-input', data: input, codecHeaders: { 'codec-type': input.kind } },
opts,
),
publishOutput: async (output, opts) => {
if (isStreamStart(output)) {
return core.startStream(output.id, { name: 'ai-output', data: output.delta ?? '' }, opts);
}
if (isStreamDelta(output)) {
core.appendStream(output.id, output.delta);
return;
}
if (isStreamEnd(output)) {
return core.closeStream(output.id, { name: 'ai-output', data: '' });
}
return core.publishDiscrete({ name: 'ai-output', data: output }, opts);
},
cancel: () => core.cancelAllStreams(),
close: () => core.close(),
};
},
createDecoder() {
const core = createDecoderCore({
buildStartEvents: (tracker) => [{ type: 'stream-start', id: tracker.streamId }],
buildDeltaEvents: (tracker, delta) => [{ type: 'stream-delta', id: tracker.streamId, delta }],
buildEndEvents: (tracker) => [{ type: 'stream-end', id: tracker.streamId }],
decodeDiscrete: (payload) => [payload.data],
});
return {
decode: (message) => {
const events = core.decode(message);
// Tag each event as TInput or TOutput based on the wire name.
return message.name === 'ai-input'
? { inputs: events, outputs: [] }
: { inputs: [], outputs: events };
},
};
},
getMessages(projection) {
// return CodecMessage<TMessage>[] for rendering
return projection.messages;
},
createUserMessage(message) {
return { kind: 'user-message', message };
},
createRegenerate(target, parent) {
return { kind: 'regenerate', target, parent };
},
};See the Codec API reference for the full interface and signatures.
Related pages
- Wire protocol: the channel format the codec reads and writes.
- Conversation tree: how decoded events fold into per-Run projections and build the tree.
- Transport patterns: how the transport layer drives the codec.
- Codec API reference: the public codec interface.