The Codec interface is the bridge between an AI framework and Ably channel messages. It defines how domain events (LLM token deltas, tool calls, finish signals) are encoded into Ably publish operations and decoded back into domain events on the client.
Implement Codec<TEvent, TMessage> to integrate any AI framework with AI Transport. The SDK ships UIMessageCodec for the Vercel AI SDK; for other frameworks, implement the four methods below.
1
2
3
4
5
6
7
8
import type { Codec } from '@ably/ai-transport';
const myCodec: Codec<MyEvent, MyMessage> = {
createEncoder(channel, options) { /* ... */ },
createDecoder() { /* ... */ },
createAccumulator() { /* ... */ },
isTerminal(event) { /* ... */ },
};Create an encoder
createEncoder(channel: ChannelWriter, options?: EncoderOptions): StreamEncoder<TEvent, TMessage>Create a streaming encoder bound to a channel. Called by the server transport once per turn. The returned encoder owns the message-append lifecycle for the turn (start a stream, append tokens, close).
Parameters
channelrequiredChannelWriteroptionsoptionalEncoderOptionsReturns
StreamEncoder<TEvent, TMessage>
A streaming encoder. See StreamEncoder for the full method surface.
Create a decoder
createDecoder(): StreamDecoder<TEvent, TMessage>Create a decoder for converting inbound Ably messages back into domain outputs. Called by the client transport on attach.
Returns
StreamDecoder<TEvent, TMessage>
A decoder. See StreamDecoder.
Create an accumulator
createAccumulator(): MessageAccumulator<TEvent, TMessage>Create an accumulator for building complete domain messages from a sequence of decoder outputs. Used internally by the client transport to reconstruct in-progress messages.
Returns
MessageAccumulator<TEvent, TMessage>
An accumulator. See MessageAccumulator.
Check whether an event is terminal
isTerminal(event: TEvent): booleanReturn true if the event signals stream completion (finish, error, abort). The transport uses this to close the stream at the right time.
Parameters
eventrequiredTEventReturns
boolean
true if the event ends a stream, otherwise false.
StreamEncoder
A streaming encoder bound to a single turn. Extends DiscreteEncoder (the stateless subset, documented below) with stateful streaming operations: appendEvent for content streams, abort for cancellation, close to flush. Every write method accepts a per-write WriteOptions:
Append a streaming event
appendEvent(event: TEvent, options?: WriteOptions): Promise<void>Encode and append a streaming domain event to an in-progress stream. Delta semantics: subsequent calls accumulate text under the same Ably message lifecycle.
Publish complete messages
writeMessages(messages: TMessage[], options?: WriteOptions): Promise<Ably.PublishResult>Encode and publish one or more domain messages atomically. All messages share the encoder's transport headers (including x-ably-msg-id), so they form one logical unit in the conversation tree.
Publish a discrete event
writeEvent(event: TEvent, options?: WriteOptions): Promise<Ably.PublishResult>Encode and publish a single domain event as a standalone discrete message. Use to publish events outside the streaming flow. Implementations should throw for event types that are only meaningful within a stream (such as text deltas).
Abort the encoder
abort(reason?: string): Promise<void>Abort all in-progress streams and publish a codec-specific abort signal. Called by the transport when a turn is cancelled. Idempotent — calling abort after all streams have already aborted is a no-op.
Close the encoder
close(): Promise<void>Flush all pending appends and close the encoder.
DiscreteEncoder
The stateless subset of StreamEncoder. Has only writeMessages and writeEvent and is safe for long-lived reuse across turns. The server transport uses a DiscreteEncoder to publish user messages (via addMessages); writeEvent is also a public surface for consumers publishing standalone discrete events.
Publish complete messages
writeMessages(messages: TMessage[], options?: WriteOptions): Promise<Ably.PublishResult>Same as StreamEncoder.writeMessages above.
Publish a discrete event
writeEvent(event: TEvent, options?: WriteOptions): Promise<Ably.PublishResult>Same as StreamEncoder.writeEvent above.
StreamDecoder
Decodes Ably messages into domain events and messages.
Decode a message
decode(message: Ably.InboundMessage): DecoderOutput<TEvent, TMessage>[]Decode a single inbound Ably message into zero or more domain outputs. The accumulator routes the outputs to the correct in-progress message using the messageId field on event outputs. Each output is a
MessageAccumulator
Accumulates decoder outputs into a list of domain messages, tracking which streams are still active. The client transport owns one accumulator per view.
Properties
messagesTMessage[]completedMessagesTMessage[]hasActiveStreamBooleanProcess decoder outputs
processOutputs(outputs: DecoderOutput<TEvent, TMessage>[]): voidProcess a batch of decoder outputs, updating internal message state.
Apply an external message update
updateMessage(message: TMessage): voidApply an external update to a message, for example from an update callback that bypasses the chunk stream.
Initialise tracking for a message
initMessage(messageId: string, message: TMessage): voidEnsure the accumulator is ready to process events for the given message. If not already active, creates internal tracking state from the message. If already active, syncs internal state with the provided message (picking up external changes like cross-turn amendments). Idempotent — safe to call before every processOutputs.
Complete a message
completeMessage(messageId: string): voidMark a message as completed. Removes it from active tracking so it appears in completedMessages. No-op if not active.
ChannelWriter
The thin write interface a Codec uses to publish to Ably. The transport supplies a ChannelWriter to createEncoder; testing harnesses and decorators can implement their own.
Publish discrete messages
publish(message: Ably.Message | Ably.Message[], options?: Ably.PublishOptions): Promise<Ably.PublishResult>Publish one or more discrete messages to the channel.
Append to an existing message
appendMessage(message: Ably.Message, operation?: Ably.MessageOperation, options?: Ably.PublishOptions): Promise<Ably.UpdateDeleteResult>Append data to an existing message identified by its serial. Used by the encoder for token-by-token streaming.
Replace an existing message
updateMessage(message: Ably.Message, operation?: Ably.MessageOperation, options?: Ably.PublishOptions): Promise<Ably.UpdateDeleteResult>Replace the data of an existing message identified by its serial. The encoder's recovery path uses this to publish accumulated content when an append fails.
Ably.RealtimeChannel satisfies ChannelWriter directly, so passing a channel where a ChannelWriter is expected works without an adapter.
Helper factories
The SDK ships helper factories that pre-implement the encoder, decoder, and lifecycle-tracker scaffolding so a custom codec only writes the framework-specific parts.
createEncoderCore
function createEncoderCore(writer: ChannelWriter, options?: EncoderCoreOptions): EncoderCoreCreate a base encoder with common publish logic and recovery already implemented. A custom codec composes it inside createEncoder and supplies framework-specific encoding hooks.
EncoderCore exposes the publish primitives a codec encoder delegates to: publishDiscrete, publishDiscreteBatch, startStream, appendStream, closeStream, abortStream, abortAllStreams, and close. The codec wires these into the StreamEncoder methods it returns from createEncoder.
createDecoderCore
function createDecoderCore<TEvent, TMessage>(hooks: DecoderCoreHooks<TEvent, TMessage>, options?: DecoderCoreOptions): DecoderCore<TEvent, TMessage>Create a base decoder with common message-parsing logic already implemented. The hooks describe how to convert inbound Ably messages into domain events for the target framework.
DecoderCoreHooks has four methods: buildStartEvents(tracker) and buildEndEvents(tracker, closingHeaders) fire at the start and end of each streamed message; buildDeltaEvents(tracker, delta) fires on every appended chunk; decodeDiscrete(payload) handles discrete (non-streaming) messages such as user messages and lifecycle events. Each hook returns an array of DecoderOutput so a single inbound message can produce zero or more domain outputs.
createLifecycleTracker
function createLifecycleTracker<TEvent>(phases: PhaseConfig<TEvent>[]): LifecycleTracker<TEvent>Create a configurable lifecycle tracker that synthesises missing phase events (for example, a synthesised start chunk when a client joins mid-stream). Use inside a custom decoder to keep the event sequence consistent regardless of when the client attached.
Example
The end-to-end shape for writing a custom codec. The encoder composes createEncoderCore and maps each domain event to a publish primitive. The decoder composes createDecoderCore and emits domain outputs from the four lifecycle hooks.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import {
createEncoderCore,
createDecoderCore,
type Codec,
} from '@ably/ai-transport';
type MyEvent =
| { type: 'text-start'; messageId: string }
| { type: 'text-delta'; messageId: string; text: string }
| { type: 'finish' };
type MyMessage = { id: string; text: string };
const myCodec: Codec<MyEvent, MyMessage> = {
createEncoder(channel, options) {
const core = createEncoderCore(channel, options);
return {
// DiscreteEncoder operations — atomic publishes outside the stream.
writeMessages: (messages, opts) =>
core.publishDiscreteBatch(messages.map((m) => ({ name: 'message', data: m })), opts),
writeEvent: (event, opts) => {
if (event.type !== 'finish') throw new Error('only finish is a discrete event');
return core.publishDiscrete({ name: 'finish', data: {} }, opts);
},
// StreamEncoder operations — lifecycle of an in-progress stream.
appendEvent: async (event) => {
if (event.type === 'text-start') {
await core.startStream(event.messageId, { name: 'text', data: '' });
} else if (event.type === 'text-delta') {
core.appendStream(event.messageId, event.text);
}
},
abort: () => core.abortAllStreams(),
close: () => core.close(),
};
},
createDecoder() {
return createDecoderCore<MyEvent, MyMessage>({
buildStartEvents: (tracker) => [
{ kind: 'event', event: { type: 'text-start', messageId: tracker.streamId } },
],
buildDeltaEvents: (tracker, delta) => [
{ kind: 'event', event: { type: 'text-delta', messageId: tracker.streamId, text: delta } },
],
buildEndEvents: () => [
{ kind: 'event', event: { type: 'finish' } },
],
decodeDiscrete: (payload) =>
payload.name === 'finish'
? [{ kind: 'event', event: { type: 'finish' } }]
: [{ kind: 'message', message: payload.data as MyMessage }],
});
},
createAccumulator() {
// Build an accumulator that owns the running list of MyMessage values.
// See the SDK source for the canonical UIMessage accumulator implementation.
},
isTerminal(event) {
return event.type === 'finish';
},
};See the codec architecture internals for the encoder and decoder pipelines in depth.