Codec
The Codec interface is the bridge between an AI framework and Ably channel messages. It describes the wire as a flat stream of input and output events and folds those events into a per-Run projection that the SDK extracts messages from for the conversation tree.
Implement Codec<TInput, TOutput, TProjection, TMessage> to integrate any AI framework with AI Transport. The SDK ships UIMessageCodec for the Vercel AI SDK. For other frameworks, implement the methods below.
1
2
3
4
5
6
7
8
9
10
11
import type { Codec } from '@ably/ai-transport';
const myCodec: Codec<MyInput, MyOutput, MyProjection, MyMessage> = {
init() { /* ... */ },
fold(state, event, meta) { /* ... */ },
createEncoder(channel, options) { /* ... */ },
createDecoder() { /* ... */ },
getMessages(projection) { /* ... */ },
createUserMessage(message) { /* ... */ },
createRegenerate(target, parent) { /* ... */ },
};Properties
Codec extends Reducer<TInput | TOutput, TProjection> and adds factories for encoders, decoders, and the message-extraction step the SDK uses to populate the tree.
init() => TProjectionfold(state, event, meta) => TProjectionstate in place.createEncoder(channel, options?) => Encoder<TInput, TOutput>createDecoder() => Decoder<TInput, TOutput>getMessages(projection) => CodecMessage<TMessage>[]{ codecMessageId, message } pairs. The SDK uses the pairs to correlate rendered messages back to the wire id; the View concatenates them across the visible Run chain.createUserMessage(message) => TInputTMessage as the codec's UserMessage variant for publishing on the ai-input wire. Returns the codec's TInput so the result is usable as a View.send argument without a cast.createRegenerate(target, parent) => TInputRegenerate input that targets an assistant message. The View calls this from regenerate().createToolResult(codecMessageId, payload) => TInputToolResult input addressed at the assistant codec-message containing the tool call. The codec defines the payload shape (for example the Vercel layer's { toolCallId, output }).createToolResultError(codecMessageId, payload) => TInputToolResultError input. The codec defines the payload shape (for example { toolCallId, message }).createToolApprovalResponse(codecMessageId, payload) => TInputToolApprovalResponse input. The codec defines the payload shape (for example { toolCallId, approved, reason? }).Fold an event into a projection
fold(state: TProjection, event: TInput | TOutput, meta: ReducerMeta): TProjectionThe reducer contract: fold one event into the projection. The same (state, event, meta) triple must produce the same result every time. Re-folding an event whose serial has already been incorporated must be a no-op. The reducer is free to store a high-water-mark inside the projection.
Parameters
staterequiredTProjectioneventrequiredTInput | TOutputmetarequiredReducerMetaReturns
TProjection. The updated projection.
Create an encoder
createEncoder(channel: ChannelWriter, options?: EncoderOptions): Encoder<TInput, TOutput>Build a stateful encoder bound to a channel. The encoder owns the message-append lifecycle for both directions on a single channel.
Parameters
channelrequiredChannelWriterAbly.RealtimeChannel satisfies this directly.optionsoptionalEncoderOptionsReturns
Encoder<TInput, TOutput>. A stateful encoder bound to the supplied channel.
Create a decoder
createDecoder(): Decoder<TInput, TOutput>Build a stateful decoder for a single channel subscription. The decoder maintains stream-tracker state across messages so that mid-stream join synthesises any missing start events before deltas reach the SDK. The reducer always sees a clean (start, delta*, end) sequence.
Returns
Decoder<TInput, TOutput>. A stateful decoder for the channel.
Encoder
A stateful encoder for a single channel. Two publish methods enforce direction at the call site. publishInput writes to the ai-input wire, publishOutput writes to the ai-output wire. Stream-tracker state lives inside the encoder and is shared across both directions.
Properties
publishInput(input, options?) => Promise<void>ai-input wire. Throws synchronously if the codec cannot encode the given input variant.publishOutput(output, options?) => Promise<void>ai-output wire. Throws synchronously if the codec cannot encode the given output variant.cancel(reason?: string) => Promise<void>close() => Promise<void>Publish an input
publishInput(input: TInput, options?: WriteOptions): Promise<void>Encode and publish a single client input on the ai-input wire. Per-write overrides live in WriteOptions.
Parameters
inputrequiredTInputoptionsoptionalWriteOptionsPublish an output
publishOutput(output: TOutput, options?: WriteOptions): Promise<void>Encode and publish a single agent output on the ai-output wire.
Parameters
outputrequiredTOutputoptionsoptionalWriteOptionsDecoder
A stateful decoder for a single channel subscription. Decodes one Ably inbound message into the input and output halves.
Decode a message
decode(message: Ably.InboundMessage): DecodedMessage<TInput, TOutput>Tagged result of decoding one inbound Ably message. The codec routes by the wire name and returns inputs and outputs separately so the SDK never has to introspect direction.
Returns
DecodedMessage<TInput, TOutput>. Tagged inputs and outputs.
Well-known input variants
Codec input variants extend the CodecInputEvent base. The SDK reserves a handful of well-known kind literals so callers can publish a user message, regenerate an assistant message, or resolve a tool call without inventing their own shapes. Codec-specific variants pick any other literal. (Edits ride the user-message path with a forkOf routing header; there is no separate 'edit' kind on the wire.)
kindString'user-message', 'regenerate', 'tool-result', 'tool-result-error', and 'tool-approval-response'.parentStringtargetStringkind.codecMessageIdStringThe full input union is the codec author's responsibility. Use the variants below directly or extend them with extra fields.
UserMessage
kind'user-message''user-message'.messageTMessageA new user message. Produced by Codec.createUserMessage and surfaced via View.send (wrap a domain message via codec.createUserMessage(message) or build the literal directly).
Regenerate
kind'regenerate''regenerate'.targetStringparentStringA signal to regenerate an existing assistant codec-message. Produced by Codec.createRegenerate and surfaced via View.regenerate.
ToolResult
kind'tool-result''tool-result'.codecMessageIdStringpayloadTPayload{ toolCallId, output }; custom codecs pick their own shape.A client-published tool result for a successful execution. Mutates the assistant codec-message addressed by codecMessageId.
ToolResultError
kind'tool-result-error''tool-result-error'.codecMessageIdStringpayloadTPayload{ toolCallId, message }.A client-published tool result for a failed execution.
ToolApprovalResponse
kind'tool-approval-response''tool-approval-response'.codecMessageIdStringpayloadTPayload{ toolCallId, approved, reason? }.A client-published response to an agent-emitted tool-approval request. Flips the targeted tool call from pending-approval to approved or denied.
Codec output base
typeStringEvery codec output variant must satisfy CodecOutputEvent. The SDK reads type so it can reliably narrow TInput | TOutput (inputs carry kind, outputs carry type).
Wire payloads
The encoder and decoder cores describe Ably messages with two payload shapes, used internally by createEncoderCore and createDecoderCore. Custom codec implementations that build on those cores work with these shapes directly.
MessagePayload describes a discrete Ably message.
nameString"text", "tool-input", "user-message").dataunknowncodecHeadersRecord<string, string>extras.ai.codec.transportHeadersRecord<string, string>extras.ai.transport.ephemeralBooleanStreamPayload describes a streamed message. Data must be a string because the append lifecycle uses text append semantics. Deltas are concatenated for recovery and prefix-matching on the decoder.
nameStringdataStringcodecHeadersRecord<string, string>extras.ai.codec.transportHeadersRecord<string, string>extras.ai.transport.Reducer
The reducer is a pure, stateless folding contract that Codec extends.
init() => TProjectionfold(state, event, meta) => TProjectionExample
A minimal codec skeleton that folds a single input and output variant and extracts a flat message list.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import type {
Codec,
CodecInputEvent,
CodecOutputEvent,
CodecMessage,
ReducerMeta,
} from '@ably/ai-transport';
interface MyInput extends CodecInputEvent { kind: 'user-message'; message: MyMessage }
interface MyOutput extends CodecOutputEvent { type: 'text-delta'; text: string }
interface MyProjection { highWater: string; messages: CodecMessage<MyMessage>[] }
interface MyMessage { id: string; role: 'user' | 'assistant'; text: string }
const myCodec: Codec<MyInput, MyOutput, MyProjection, MyMessage> = {
init: () => ({ highWater: '', messages: [] }),
fold: (state, event, meta: ReducerMeta) => {
if (meta.serial <= state.highWater) return state;
state.highWater = meta.serial;
if ('kind' in event && event.kind === 'user-message') {
state.messages.push({ codecMessageId: meta.messageId ?? event.message.id, message: event.message });
} else if ('type' in event && event.type === 'text-delta') {
const last = state.messages.at(-1);
if (last?.message.role === 'assistant') last.message.text += event.text;
}
return state;
},
createEncoder: (channel, options) => buildEncoder(channel, options),
createDecoder: () => buildDecoder(),
getMessages: (projection) => projection.messages,
createUserMessage: (message) => ({ kind: 'user-message', message }),
createRegenerate: (target, parent) => ({ kind: 'regenerate', target, parent }),
};