The codec is the layer that translates between your AI framework's event types and Ably channel messages. This page describes how each part of the codec works internally and how the pieces fit together.
Two-layer split
AI Transport separates concerns into two layers:
- Transport layer (generic): manages turns, lifecycle events, cancellation, history, and multi-client sync. It works with any message type.
- Domain layer (codec): maps framework-specific events to Ably publish operations and back. It knows the shape of your events and messages.
The transport layer calls into the codec but never inspects the domain data. The codec calls into the channel writer but never manages turns or lifecycle. This separation is what makes AI Transport framework-agnostic.
Encoder core
The encoder converts outbound domain events into Ably publish operations. It supports two modes.
Discrete mode
Discrete mode publishes a complete message in a single operation. The encoder calls channel.publish() once with the full message content. User messages and tool results typically use discrete mode.
Streamed mode
Streamed mode publishes a message incrementally. The encoder creates a message, appends tokens to it, and then closes it. This is the mode used for LLM response streaming.
The append pipeline is fire-and-forget: each appendMessage call is dispatched without waiting for acknowledgment. This keeps the stream flowing at the rate the LLM produces tokens, without back-pressure from the network.
Flush and recovery
If an append fails (for example, due to a transient network error), the encoder falls back to an updateMessage call that sends the full accumulated content. This is the recovery path. The encoder maintains a buffer of all appended content so it can always reconstruct the complete message.
After recovery, the encoder continues appending from the recovered state. Subscribers see a seamless stream regardless of whether individual appends succeeded.
Header merging
Every outbound message carries both transport and domain headers. The encoder merges them with this priority order:
- Transport headers set by the transport layer (turn ID, role, stream status).
- Domain headers set by the codec (framework-specific metadata).
- Per-message headers set by the caller (overrides for specific messages).
Higher-priority headers overwrite lower-priority ones. This ensures transport-critical headers like x-ably-turn-id cannot be accidentally overridden by the codec.
Decoder core
The decoder converts inbound Ably messages back into domain events. It dispatches on the message action to determine how to process each message.
Action dispatch
The decoder handles four Ably message actions:
| Action | Meaning | Decoder behavior |
|---|---|---|
create | A new message arrived | Start a new message or stream. Emit the initial event. |
append | A token was appended to a message | Emit a delta event for the matching stream. |
update | A message was updated in place | Update the message state. If the status is terminal, close the stream. |
delete | A message was removed | Remove the message from state. |
Stream tracker
The decoder maintains a stream tracker that maps stream IDs to their current state. When a create action arrives with x-ably-stream: true, the tracker opens a new stream. Subsequent append actions for the same stream ID are routed to that stream. An update action with a terminal status closes the stream.
First-contact vs prefix-match
When a client joins a channel, the first message it receives for a given stream may not be the create action - the stream may already be in progress. The decoder handles this with two strategies:
- First-contact: if the first action for a stream is
create, the decoder processes it normally. - Prefix-match: if the first action for a stream is
appendorupdate, the decoder recognizes this as a mid-stream join. It initializes the stream from the accumulated message state on the channel rather than waiting for acreatethat already happened.
Accumulator
The accumulator builds complete messages from a sequence of domain events. It is used in two contexts:
Live stream
During live streaming, the accumulator processes events as they arrive from the decoder. It maintains a running list of messages, appending tokens to the in-progress message. The messages property returns all messages including the one currently being streamed. The completedMessages property returns only messages where the stream has ended.
History
When loading history, the accumulator replays stored events to reconstruct message state. The processOutputs method accepts a batch of events and updates the internal state. This is the same method used during live streaming - the accumulator does not distinguish between live and historical events.
1
2
3
4
5
6
7
8
9
const accumulator = codec.createAccumulator()
// Process events from history or live stream
accumulator.processOutputs(events)
// Read current state
accumulator.messages // all messages (including in-progress)
accumulator.completedMessages // only completed messages
accumulator.hasActiveStream // true if a stream is in progressLifecycle tracker
The lifecycle tracker ensures clients receive a consistent sequence of lifecycle events, even when joining mid-stream. If a client subscribes to a channel where a turn is already in progress, the tracker synthesizes the missing lifecycle events (like turn-start and the stream create) so the client can process the in-progress turn correctly.
The tracker maintains configurable phases. Each phase defines a set of events that must be observed before the next phase begins. If the tracker detects a gap (for example, the client received an append without seeing a stream create), it synthesizes the missing events and delivers them in order before the real event.
Write a custom codec
To build a codec for a framework not covered by the SDK:
- Implement the
Codecinterface with your event and message types. - Use
createEncoderCoreandcreateDecoderCorefrom@ably/ai-transportto get the base publish and dispatch logic. - Provide hooks that map your framework's events to the core operations (create, append, update, close).
- Implement
isTerminalto identify events that end a stream.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import { createEncoderCore, createDecoderCore } from '@ably/ai-transport'
const myCodec: Codec<MyEvent, MyMessage> = {
createEncoder(channel, options) {
const core = createEncoderCore(channel, options)
return {
async appendEvent(event) {
await core.appendStream(event.messageId, serialize(event))
},
async writeMessages(messages, opts) {
await core.publishDiscreteBatch(messages.map((msg) => serialize(msg)), opts)
},
async writeEvent(event) {
await core.publishDiscrete(serialize(event))
},
async abort() { await core.abortAllStreams() },
async close() { await core.close() },
}
},
createDecoder() {
const core = createDecoderCore({
onCreate: (msg) => deserialize(msg.data),
onAppend: (msg) => deserialize(msg.data),
onUpdate: (msg) => deserialize(msg.data),
})
return {
decode(message) {
return core.decode(message)
},
}
},
createAccumulator() {
// Build and return a MessageAccumulator for your types
},
isTerminal(event) {
return event.type === 'finish' || event.type === 'error'
},
}See the Codec API reference for the full interface specification.
Related pages
- Wire protocol - the message format the codec reads and writes.
- Conversation tree - how decoded messages build the tree.
- Transport patterns - how the transport layer drives the codec.
- Codec API reference - the public codec interface.