Codec

The Codec interface is the bridge between an AI framework and Ably channel messages. It describes the wire as a flat stream of input and output events and folds those events into a per-Run projection that the SDK extracts messages from for the conversation tree.

Implement Codec<TInput, TOutput, TProjection, TMessage> to integrate any AI framework with AI Transport. The SDK ships UIMessageCodec for the Vercel AI SDK. For other frameworks, implement the methods below.

TypeScript

1

2

3

4

5

6

7

8

9

10

11

import type { Codec } from '@ably/ai-transport';

const myCodec: Codec<MyInput, MyOutput, MyProjection, MyMessage> = {
  init() { /* ... */ },
  fold(state, event, meta) { /* ... */ },
  createEncoder(channel, options) { /* ... */ },
  createDecoder() { /* ... */ },
  getMessages(projection) { /* ... */ },
  createUserMessage(message) { /* ... */ },
  createRegenerate(target, parent) { /* ... */ },
};

Properties

Codec extends Reducer<TInput | TOutput, TProjection> and adds factories for encoders, decoders, and the message-extraction step the SDK uses to populate the tree.

init() => TProjection
Build an empty initial projection. Called once per Run before any events are folded.
fold(state, event, meta) => TProjection
Fold one event into the projection and return the updated projection. May mutate state in place.
createEncoder(channel, options?) => Encoder<TInput, TOutput>
Create a stateful encoder bound to the given channel writer. Takes optional
.
createDecoder() => Decoder<TInput, TOutput>
Create a stateful decoder that maps inbound Ably messages to typed inputs and outputs. Each call returns a
.
getMessages(projection) => CodecMessage<TMessage>[]
Extract the per-message list from a projection as { codecMessageId, message } pairs. The SDK uses the pairs to correlate rendered messages back to the wire id; the View concatenates them across the visible Run chain.
createUserMessage(message) => TInput
Wrap a TMessage as the codec's UserMessage variant for publishing on the ai-input wire. Returns the codec's TInput so the result is usable as a View.send argument without a cast.
createRegenerate(target, parent) => TInput
Build a Regenerate input that targets an assistant message. The View calls this from regenerate().
createToolResult(codecMessageId, payload) => TInput
Optional. Build a ToolResult input addressed at the assistant codec-message containing the tool call. The codec defines the payload shape (for example the Vercel layer's { toolCallId, output }).
createToolResultError(codecMessageId, payload) => TInput
Optional. Build a ToolResultError input. The codec defines the payload shape (for example { toolCallId, message }).
createToolApprovalResponse(codecMessageId, payload) => TInput
Optional. Build a ToolApprovalResponse input. The codec defines the payload shape (for example { toolCallId, approved, reason? }).

Fold an event into a projection

fold(state: TProjection, event: TInput | TOutput, meta: ReducerMeta): TProjection

The reducer contract: fold one event into the projection. The same (state, event, meta) triple must produce the same result every time. Re-folding an event whose serial has already been incorporated must be a no-op. The reducer is free to store a high-water-mark inside the projection.

Parameters

staterequiredTProjection
The current projection. May be mutated and returned.
eventrequiredTInput | TOutput
The next input or output event to fold.
metarequiredReducerMeta
Transport-derived metadata stamped from the inbound Ably message.

Returns

TProjection. The updated projection.

Create an encoder

createEncoder(channel: ChannelWriter, options?: EncoderOptions): Encoder<TInput, TOutput>

Build a stateful encoder bound to a channel. The encoder owns the message-append lifecycle for both directions on a single channel.

Parameters

channelrequiredChannelWriter
The channel writer to publish through. An Ably.RealtimeChannel satisfies this directly.
optionsoptionalEncoderOptions
Per-encoder defaults (clientId, extras, messageId, onMessage hook).

Returns

Encoder<TInput, TOutput>. A stateful encoder bound to the supplied channel.

Create a decoder

createDecoder(): Decoder<TInput, TOutput>

Build a stateful decoder for a single channel subscription. The decoder maintains stream-tracker state across messages so that mid-stream join synthesises any missing start events before deltas reach the SDK. The reducer always sees a clean (start, delta*, end) sequence.

Returns

Decoder<TInput, TOutput>. A stateful decoder for the channel.

Encoder

A stateful encoder for a single channel. Two publish methods enforce direction at the call site. publishInput writes to the ai-input wire, publishOutput writes to the ai-output wire. Stream-tracker state lives inside the encoder and is shared across both directions.

Properties

publishInput(input, options?) => Promise<void>
Encode and publish a single client input on the ai-input wire. Throws synchronously if the codec cannot encode the given input variant.
publishOutput(output, options?) => Promise<void>
Encode and publish a single agent output on the ai-output wire. Throws synchronously if the codec cannot encode the given output variant.
cancel(reason?: string) => Promise<void>
Cancel any in-progress streams and emit a codec-specific cancel signal. Idempotent.
close() => Promise<void>
Flush pending appends and release encoder resources.

Publish an input

publishInput(input: TInput, options?: WriteOptions): Promise<void>

Encode and publish a single client input on the ai-input wire. Per-write overrides live in WriteOptions.

Parameters

inputrequiredTInput
The input event to encode and publish.
optionsoptionalWriteOptions
Per-write overrides (clientId, extras, messageId).

Publish an output

publishOutput(output: TOutput, options?: WriteOptions): Promise<void>

Encode and publish a single agent output on the ai-output wire.

Parameters

outputrequiredTOutput
The output event to encode and publish.
optionsoptionalWriteOptions
Per-write overrides.

Decoder

A stateful decoder for a single channel subscription. Decodes one Ably inbound message into the input and output halves.

Decode a message

decode(message: Ably.InboundMessage): DecodedMessage<TInput, TOutput>

Tagged result of decoding one inbound Ably message. The codec routes by the wire name and returns inputs and outputs separately so the SDK never has to introspect direction.

Returns

DecodedMessage<TInput, TOutput>. Tagged inputs and outputs.

Well-known input variants

Codec input variants extend the CodecInputEvent base. The SDK reserves a handful of well-known kind literals so callers can publish a user message, regenerate an assistant message, or resolve a tool call without inventing their own shapes. Codec-specific variants pick any other literal. (Edits ride the user-message path with a forkOf routing header; there is no separate 'edit' kind on the wire.)

kindString
Discriminator. The SDK reserves 'user-message', 'regenerate', 'tool-result', 'tool-result-error', and 'tool-approval-response'.
parentString
The codec-message-id of the preceding codec-message on this branch. Auto-computed when omitted.
targetString
Pointer to another codec-message this input references. Semantic depends on kind.
codecMessageIdString
Targets an existing codec-message-id instead of minting a fresh one. Used by continuation inputs that amend an existing assistant message.

The full input union is the codec author's responsibility. Use the variants below directly or extend them with extra fields.

UserMessage

kind'user-message'
Pinned to 'user-message'.
messageTMessage
The user's message in the codec's domain representation.

A new user message. Produced by Codec.createUserMessage and surfaced via View.send (wrap a domain message via codec.createUserMessage(message) or build the literal directly).

Regenerate

kind'regenerate'
Pinned to 'regenerate'.
targetString
The codec-message-id of the assistant to regenerate. Required.
parentString
The codec-message-id of the parent user message the new assistant threads under. Required.

A signal to regenerate an existing assistant codec-message. Produced by Codec.createRegenerate and surfaced via View.regenerate.

ToolResult

kind'tool-result'
Pinned to 'tool-result'.
codecMessageIdString
The assistant codec-message containing the tool call.
payloadTPayload
Codec-defined domain payload describing the result. The Vercel codec uses { toolCallId, output }; custom codecs pick their own shape.

A client-published tool result for a successful execution. Mutates the assistant codec-message addressed by codecMessageId.

ToolResultError

kind'tool-result-error'
Pinned to 'tool-result-error'.
codecMessageIdString
The assistant codec-message containing the tool call.
payloadTPayload
Codec-defined domain payload describing the failure. The Vercel codec uses { toolCallId, message }.

A client-published tool result for a failed execution.

ToolApprovalResponse

kind'tool-approval-response'
Pinned to 'tool-approval-response'.
codecMessageIdString
The assistant codec-message containing the tool call.
payloadTPayload
Codec-defined domain payload describing the decision. The Vercel codec uses { toolCallId, approved, reason? }.

A client-published response to an agent-emitted tool-approval request. Flips the targeted tool call from pending-approval to approved or denied.

Codec output base

typeString
Discriminator. Codec authors pick the literal value per variant.

Every codec output variant must satisfy CodecOutputEvent. The SDK reads type so it can reliably narrow TInput | TOutput (inputs carry kind, outputs carry type).

Wire payloads

The encoder and decoder cores describe Ably messages with two payload shapes, used internally by createEncoderCore and createDecoderCore. Custom codec implementations that build on those cores work with these shapes directly.

MessagePayload describes a discrete Ably message.

nameString
Ably message name (for example "text", "tool-input", "user-message").
dataunknown
Message data. Ably handles serialisation.
codecHeadersRecord<string, string>
Codec-tier headers carried under extras.ai.codec.
transportHeadersRecord<string, string>
Transport-tier headers carried under extras.ai.transport.
ephemeralBoolean
Mark this message as ephemeral (not persisted in channel history).

StreamPayload describes a streamed message. Data must be a string because the append lifecycle uses text append semantics. Deltas are concatenated for recovery and prefix-matching on the decoder.

nameString
Ably message name.
dataString
Initial or closing data for the stream. Must be a string for append/accumulate semantics.
codecHeadersRecord<string, string>
Codec-tier headers carried under extras.ai.codec.
transportHeadersRecord<string, string>
Transport-tier headers carried under extras.ai.transport.

Reducer

The reducer is a pure, stateless folding contract that Codec extends.

init() => TProjection
Build an empty initial projection.
fold(state, event, meta) => TProjection
Fold one event into the projection.

Example

A minimal codec skeleton that folds a single input and output variant and extracts a flat message list.

TypeScript

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

import type {
  Codec,
  CodecInputEvent,
  CodecOutputEvent,
  CodecMessage,
  ReducerMeta,
} from '@ably/ai-transport';

interface MyInput extends CodecInputEvent { kind: 'user-message'; message: MyMessage }
interface MyOutput extends CodecOutputEvent { type: 'text-delta'; text: string }
interface MyProjection { highWater: string; messages: CodecMessage<MyMessage>[] }
interface MyMessage { id: string; role: 'user' | 'assistant'; text: string }

const myCodec: Codec<MyInput, MyOutput, MyProjection, MyMessage> = {
  init: () => ({ highWater: '', messages: [] }),
  fold: (state, event, meta: ReducerMeta) => {
    if (meta.serial <= state.highWater) return state;
    state.highWater = meta.serial;
    if ('kind' in event && event.kind === 'user-message') {
      state.messages.push({ codecMessageId: meta.messageId ?? event.message.id, message: event.message });
    } else if ('type' in event && event.type === 'text-delta') {
      const last = state.messages.at(-1);
      if (last?.message.role === 'assistant') last.message.text += event.text;
    }
    return state;
  },
  createEncoder: (channel, options) => buildEncoder(channel, options),
  createDecoder: () => buildDecoder(),
  getMessages: (projection) => projection.messages,
  createUserMessage: (message) => ({ kind: 'user-message', message }),
  createRegenerate: (target, parent) => ({ kind: 'regenerate', target, parent }),
};