# Codec The `Codec` interface is the bridge between an AI framework and Ably channel messages. It defines how domain events (LLM token deltas, tool calls, finish signals) are encoded into Ably publish operations and decoded back into domain events on the client. Implement `Codec` to integrate any AI framework with AI Transport. The SDK ships `UIMessageCodec` for the Vercel AI SDK; for other frameworks, implement the four methods below. #### Javascript ``` import type { Codec } from '@ably/ai-transport'; const myCodec: Codec = { createEncoder(channel, options) { /* ... */ }, createDecoder() { /* ... */ }, createAccumulator() { /* ... */ }, isTerminal(event) { /* ... */ }, }; ``` ## Create an encoder `createEncoder(channel: ChannelWriter, options?: EncoderOptions): StreamEncoder` Create a streaming encoder bound to a channel. Called by the server transport once per turn. The returned encoder owns the message-append lifecycle for the turn (start a stream, append tokens, close). ### Parameters | Parameter | Required | Description | Type | | --- | --- | --- | --- | | channel | Required | The channel writer to publish through. Provided by the transport. Documented as a visible section below. | `ChannelWriter` | | options | Optional | Default identity, extras, and hooks shared across every write through this encoder. |
|
| Property | Description | Type | | --- | --- | --- | | clientId | Default `clientId` for every write. | String or Undefined | | extras | Default extras (headers) merged into every Ably message. |
or Undefined | | onMessage | Hook called before each Ably message is published. Mutate the message in place to add transport-level headers. | `(message: Ably.Message) => void` or Undefined | | messageId | Domain-level message identity. Used as a fallback when a lifecycle chunk does not provide one, ensuring downstream observers and the transport accumulator assign the same ID. | String or Undefined |
| Property | Description | Type | | --- | --- | --- | | headers | Headers to attach to the Ably message `extras`. | `Record` or Undefined |
### Returns `StreamEncoder` A streaming encoder. See [`StreamEncoder`](#StreamEncoder) for the full method surface. ## Create a decoder `createDecoder(): StreamDecoder` Create a decoder for converting inbound Ably messages back into domain outputs. Called by the client transport on attach. ### Returns `StreamDecoder` A decoder. See [`StreamDecoder`](#StreamDecoder). ## Create an accumulator `createAccumulator(): MessageAccumulator` Create an accumulator for building complete domain messages from a sequence of decoder outputs. Used internally by the client transport to reconstruct in-progress messages. ### Returns `MessageAccumulator` An accumulator. See [`MessageAccumulator`](#MessageAccumulator). ## Check whether an event is terminal `isTerminal(event: TEvent): boolean` Return `true` if the event signals stream completion (finish, error, abort). The transport uses this to close the stream at the right time. ### Parameters | Parameter | Required | Description | Type | | --- | --- | --- | --- | | event | Required | The domain event to inspect. | `TEvent` |
### Returns `boolean` `true` if the event ends a stream, otherwise `false`. ## StreamEncoder A streaming encoder bound to a single turn. Extends `DiscreteEncoder` (the stateless subset, documented below) with stateful streaming operations: `appendEvent` for content streams, `abort` for cancellation, `close` to flush. Every write method accepts a per-write `WriteOptions`: ### Append a streaming event `appendEvent(event: TEvent, options?: WriteOptions): Promise` Encode and append a streaming domain event to an in-progress stream. Delta semantics: subsequent calls accumulate text under the same Ably message lifecycle. ### Publish complete messages `writeMessages(messages: TMessage[], options?: WriteOptions): Promise` Encode and publish one or more domain messages atomically. All messages share the encoder's transport headers (including `x-ably-msg-id`), so they form one logical unit in the conversation tree. ### Publish a discrete event `writeEvent(event: TEvent, options?: WriteOptions): Promise` Encode and publish a single domain event as a standalone discrete message. Use to publish events outside the streaming flow. Implementations should throw for event types that are only meaningful within a stream (such as text deltas). ### Abort the encoder `abort(reason?: string): Promise` Abort all in-progress streams and publish a codec-specific abort signal. Called by the transport when a turn is cancelled. Idempotent — calling abort after all streams have already aborted is a no-op. ### Close the encoder `close(): Promise` Flush all pending appends and close the encoder.
| Property | Description | Type | | --- | --- | --- | | clientId | Override the default `clientId` for this write. | String or Undefined | | extras | Override the default extras for this write. |
or Undefined | | messageId | Message identity for accumulator correlation. Stamped as `x-ably-msg-id`. | String or Undefined |
## DiscreteEncoder The stateless subset of `StreamEncoder`. Has only `writeMessages` and `writeEvent` and is safe for long-lived reuse across turns. The server transport uses a `DiscreteEncoder` to publish user messages (via `addMessages`); `writeEvent` is also a public surface for consumers publishing standalone discrete events. ### Publish complete messages `writeMessages(messages: TMessage[], options?: WriteOptions): Promise` Same as `StreamEncoder.writeMessages` above. ### Publish a discrete event `writeEvent(event: TEvent, options?: WriteOptions): Promise` Same as `StreamEncoder.writeEvent` above. ## StreamDecoder Decodes Ably messages into domain events and messages. ### Decode a message `decode(message: Ably.InboundMessage): DecoderOutput[]` Decode a single inbound Ably message into zero or more domain outputs. The accumulator routes the outputs to the correct in-progress message using the `messageId` field on event outputs. Each output is a .
| Property | Description | Type | | --- | --- | --- | | kind | `'event'` for a streamed event, `'message'` for a complete message. | `'event'` or `'message'` | | event | The decoded event. Present when `kind` is `'event'`. | `TEvent` or Undefined | | messageId | The `x-ably-msg-id` header from the source message. Present on event outputs; used by the accumulator to route the event. | String or Undefined | | message | The decoded message. Present when `kind` is `'message'`. | `TMessage` or Undefined |
## MessageAccumulator Accumulates decoder outputs into a list of domain messages, tracking which streams are still active. The client transport owns one accumulator per view. ### Properties | Property | Description | Type | | --- | --- | --- | | messages | Every message accumulated so far, in-progress and completed. | `TMessage[]` | | completedMessages | Only messages whose streams have finished. | `TMessage[]` | | hasActiveStream | Whether any stream is still actively receiving data. | Boolean |
### Process decoder outputs `processOutputs(outputs: DecoderOutput[]): void` Process a batch of decoder outputs, updating internal message state. ### Apply an external message update `updateMessage(message: TMessage): void` Apply an external update to a message, for example from an update callback that bypasses the chunk stream. ### Initialise tracking for a message `initMessage(messageId: string, message: TMessage): void` Ensure the accumulator is ready to process events for the given message. If not already active, creates internal tracking state from the message. If already active, syncs internal state with the provided message (picking up external changes like cross-turn amendments). Idempotent — safe to call before every `processOutputs`. ### Complete a message `completeMessage(messageId: string): void` Mark a message as completed. Removes it from active tracking so it appears in `completedMessages`. No-op if not active. ## ChannelWriter The thin write interface a `Codec` uses to publish to Ably. The transport supplies a `ChannelWriter` to `createEncoder`; testing harnesses and decorators can implement their own. ### Publish discrete messages `publish(message: Ably.Message | Ably.Message[], options?: Ably.PublishOptions): Promise` Publish one or more discrete messages to the channel. ### Append to an existing message `appendMessage(message: Ably.Message, operation?: Ably.MessageOperation, options?: Ably.PublishOptions): Promise` Append data to an existing message identified by its serial. Used by the encoder for token-by-token streaming. ### Replace an existing message `updateMessage(message: Ably.Message, operation?: Ably.MessageOperation, options?: Ably.PublishOptions): Promise` Replace the data of an existing message identified by its serial. The encoder's recovery path uses this to publish accumulated content when an append fails. `Ably.RealtimeChannel` satisfies `ChannelWriter` directly, so passing a channel where a `ChannelWriter` is expected works without an adapter. ## Helper factories The SDK ships helper factories that pre-implement the encoder, decoder, and lifecycle-tracker scaffolding so a custom codec only writes the framework-specific parts. ### createEncoderCore `function createEncoderCore(writer: ChannelWriter, options?: EncoderCoreOptions): EncoderCore` Create a base encoder with common publish logic and recovery already implemented. A custom codec composes it inside `createEncoder` and supplies framework-specific encoding hooks. `EncoderCore` exposes the publish primitives a codec encoder delegates to: `publishDiscrete`, `publishDiscreteBatch`, `startStream`, `appendStream`, `closeStream`, `abortStream`, `abortAllStreams`, and `close`. The codec wires these into the `StreamEncoder` methods it returns from `createEncoder`. ### createDecoderCore `function createDecoderCore(hooks: DecoderCoreHooks, options?: DecoderCoreOptions): DecoderCore` Create a base decoder with common message-parsing logic already implemented. The hooks describe how to convert inbound Ably messages into domain events for the target framework. `DecoderCoreHooks` has four methods: `buildStartEvents(tracker)` and `buildEndEvents(tracker, closingHeaders)` fire at the start and end of each streamed message; `buildDeltaEvents(tracker, delta)` fires on every appended chunk; `decodeDiscrete(payload)` handles discrete (non-streaming) messages such as user messages and lifecycle events. Each hook returns an array of `DecoderOutput` so a single inbound message can produce zero or more domain outputs. ### createLifecycleTracker `function createLifecycleTracker(phases: PhaseConfig[]): LifecycleTracker` Create a configurable lifecycle tracker that synthesises missing phase events (for example, a synthesised `start` chunk when a client joins mid-stream). Use inside a custom decoder to keep the event sequence consistent regardless of when the client attached. ## Example The end-to-end shape for writing a custom codec. The encoder composes `createEncoderCore` and maps each domain event to a publish primitive. The decoder composes `createDecoderCore` and emits domain outputs from the four lifecycle hooks. ### Javascript ``` import { createEncoderCore, createDecoderCore, type Codec, } from '@ably/ai-transport'; type MyEvent = | { type: 'text-start'; messageId: string } | { type: 'text-delta'; messageId: string; text: string } | { type: 'finish' }; type MyMessage = { id: string; text: string }; const myCodec: Codec = { createEncoder(channel, options) { const core = createEncoderCore(channel, options); return { // DiscreteEncoder operations — atomic publishes outside the stream. writeMessages: (messages, opts) => core.publishDiscreteBatch(messages.map((m) => ({ name: 'message', data: m })), opts), writeEvent: (event, opts) => { if (event.type !== 'finish') throw new Error('only finish is a discrete event'); return core.publishDiscrete({ name: 'finish', data: {} }, opts); }, // StreamEncoder operations — lifecycle of an in-progress stream. appendEvent: async (event) => { if (event.type === 'text-start') { await core.startStream(event.messageId, { name: 'text', data: '' }); } else if (event.type === 'text-delta') { core.appendStream(event.messageId, event.text); } }, abort: () => core.abortAllStreams(), close: () => core.close(), }; }, createDecoder() { return createDecoderCore({ buildStartEvents: (tracker) => [ { kind: 'event', event: { type: 'text-start', messageId: tracker.streamId } }, ], buildDeltaEvents: (tracker, delta) => [ { kind: 'event', event: { type: 'text-delta', messageId: tracker.streamId, text: delta } }, ], buildEndEvents: () => [ { kind: 'event', event: { type: 'finish' } }, ], decodeDiscrete: (payload) => payload.name === 'finish' ? [{ kind: 'event', event: { type: 'finish' } }] : [{ kind: 'message', message: payload.data as MyMessage }], }); }, createAccumulator() { // Build an accumulator that owns the running list of MyMessage values. // See the SDK source for the canonical UIMessage accumulator implementation. }, isTerminal(event) { return event.type === 'finish'; }, }; ``` See the [codec architecture internals](https://ably.com/docs/ai-transport/internals/codec-architecture.md) for the encoder and decoder pipelines in depth. ## Related Topics - [Client transport](https://ably.com/docs/ai-transport/api/javascript/client-transport.md): API reference for the AI Transport client transport: options, properties, methods, the View interface, and the ActiveTurn type. - [Server transport](https://ably.com/docs/ai-transport/api/javascript/server-transport.md): API reference for the AI Transport server transport: turn lifecycle, cancel routing, abort hooks, and configuration options. - [Vercel integration](https://ably.com/docs/ai-transport/api/javascript/vercel.md): API reference for the AI Transport Vercel AI SDK integration. UIMessageCodec, ChatTransport, and pre-bound transport factories. ## Documentation Index To discover additional Ably documentation: 1. Fetch [llms.txt](https://ably.com/llms.txt) for the canonical list of available pages. 2. Identify relevant URLs from that index. 3. Fetch target pages as needed. Avoid using assumed or outdated documentation paths.