Codec

Open in

The Codec interface is the bridge between an AI framework and Ably channel messages. It defines how domain events (LLM token deltas, tool calls, finish signals) are encoded into Ably publish operations and decoded back into domain events on the client.

Implement Codec<TEvent, TMessage> to integrate any AI framework with AI Transport. The SDK ships UIMessageCodec for the Vercel AI SDK; for other frameworks, implement the four methods below.

JavaScript

1

2

3

4

5

6

7

8

import type { Codec } from '@ably/ai-transport';

const myCodec: Codec<MyEvent, MyMessage> = {
  createEncoder(channel, options) { /* ... */ },
  createDecoder() { /* ... */ },
  createAccumulator() { /* ... */ },
  isTerminal(event) { /* ... */ },
};

Create an encoder

createEncoder(channel: ChannelWriter, options?: EncoderOptions): StreamEncoder<TEvent, TMessage>

Create a streaming encoder bound to a channel. Called by the server transport once per turn. The returned encoder owns the message-append lifecycle for the turn (start a stream, append tokens, close).

Parameters

channelrequiredChannelWriter
The channel writer to publish through. Provided by the transport. Documented as a visible section below.
optionsoptionalEncoderOptions
Default identity, extras, and hooks shared across every write through this encoder.

Returns

StreamEncoder<TEvent, TMessage>

A streaming encoder. See StreamEncoder for the full method surface.

Create a decoder

createDecoder(): StreamDecoder<TEvent, TMessage>

Create a decoder for converting inbound Ably messages back into domain outputs. Called by the client transport on attach.

Returns

StreamDecoder<TEvent, TMessage>

A decoder. See StreamDecoder.

Create an accumulator

createAccumulator(): MessageAccumulator<TEvent, TMessage>

Create an accumulator for building complete domain messages from a sequence of decoder outputs. Used internally by the client transport to reconstruct in-progress messages.

Returns

MessageAccumulator<TEvent, TMessage>

An accumulator. See MessageAccumulator.

Check whether an event is terminal

isTerminal(event: TEvent): boolean

Return true if the event signals stream completion (finish, error, abort). The transport uses this to close the stream at the right time.

Parameters

eventrequiredTEvent
The domain event to inspect.

Returns

boolean

true if the event ends a stream, otherwise false.

StreamEncoder

A streaming encoder bound to a single turn. Extends DiscreteEncoder (the stateless subset, documented below) with stateful streaming operations: appendEvent for content streams, abort for cancellation, close to flush. Every write method accepts a per-write WriteOptions:

Append a streaming event

appendEvent(event: TEvent, options?: WriteOptions): Promise<void>

Encode and append a streaming domain event to an in-progress stream. Delta semantics: subsequent calls accumulate text under the same Ably message lifecycle.

Publish complete messages

writeMessages(messages: TMessage[], options?: WriteOptions): Promise<Ably.PublishResult>

Encode and publish one or more domain messages atomically. All messages share the encoder's transport headers (including x-ably-msg-id), so they form one logical unit in the conversation tree.

Publish a discrete event

writeEvent(event: TEvent, options?: WriteOptions): Promise<Ably.PublishResult>

Encode and publish a single domain event as a standalone discrete message. Use to publish events outside the streaming flow. Implementations should throw for event types that are only meaningful within a stream (such as text deltas).

Abort the encoder

abort(reason?: string): Promise<void>

Abort all in-progress streams and publish a codec-specific abort signal. Called by the transport when a turn is cancelled. Idempotent — calling abort after all streams have already aborted is a no-op.

Close the encoder

close(): Promise<void>

Flush all pending appends and close the encoder.

DiscreteEncoder

The stateless subset of StreamEncoder. Has only writeMessages and writeEvent and is safe for long-lived reuse across turns. The server transport uses a DiscreteEncoder to publish user messages (via addMessages); writeEvent is also a public surface for consumers publishing standalone discrete events.

Publish complete messages

writeMessages(messages: TMessage[], options?: WriteOptions): Promise<Ably.PublishResult>

Same as StreamEncoder.writeMessages above.

Publish a discrete event

writeEvent(event: TEvent, options?: WriteOptions): Promise<Ably.PublishResult>

Same as StreamEncoder.writeEvent above.

StreamDecoder

Decodes Ably messages into domain events and messages.

Decode a message

decode(message: Ably.InboundMessage): DecoderOutput<TEvent, TMessage>[]

Decode a single inbound Ably message into zero or more domain outputs. The accumulator routes the outputs to the correct in-progress message using the messageId field on event outputs. Each output is a

.

MessageAccumulator

Accumulates decoder outputs into a list of domain messages, tracking which streams are still active. The client transport owns one accumulator per view.

Properties

messagesTMessage[]
Every message accumulated so far, in-progress and completed.
completedMessagesTMessage[]
Only messages whose streams have finished.
hasActiveStreamBoolean
Whether any stream is still actively receiving data.

Process decoder outputs

processOutputs(outputs: DecoderOutput<TEvent, TMessage>[]): void

Process a batch of decoder outputs, updating internal message state.

Apply an external message update

updateMessage(message: TMessage): void

Apply an external update to a message, for example from an update callback that bypasses the chunk stream.

Initialise tracking for a message

initMessage(messageId: string, message: TMessage): void

Ensure the accumulator is ready to process events for the given message. If not already active, creates internal tracking state from the message. If already active, syncs internal state with the provided message (picking up external changes like cross-turn amendments). Idempotent — safe to call before every processOutputs.

Complete a message

completeMessage(messageId: string): void

Mark a message as completed. Removes it from active tracking so it appears in completedMessages. No-op if not active.

ChannelWriter

The thin write interface a Codec uses to publish to Ably. The transport supplies a ChannelWriter to createEncoder; testing harnesses and decorators can implement their own.

Publish discrete messages

publish(message: Ably.Message | Ably.Message[], options?: Ably.PublishOptions): Promise<Ably.PublishResult>

Publish one or more discrete messages to the channel.

Append to an existing message

appendMessage(message: Ably.Message, operation?: Ably.MessageOperation, options?: Ably.PublishOptions): Promise<Ably.UpdateDeleteResult>

Append data to an existing message identified by its serial. Used by the encoder for token-by-token streaming.

Replace an existing message

updateMessage(message: Ably.Message, operation?: Ably.MessageOperation, options?: Ably.PublishOptions): Promise<Ably.UpdateDeleteResult>

Replace the data of an existing message identified by its serial. The encoder's recovery path uses this to publish accumulated content when an append fails.

Ably.RealtimeChannel satisfies ChannelWriter directly, so passing a channel where a ChannelWriter is expected works without an adapter.

Helper factories

The SDK ships helper factories that pre-implement the encoder, decoder, and lifecycle-tracker scaffolding so a custom codec only writes the framework-specific parts.

createEncoderCore

function createEncoderCore(writer: ChannelWriter, options?: EncoderCoreOptions): EncoderCore

Create a base encoder with common publish logic and recovery already implemented. A custom codec composes it inside createEncoder and supplies framework-specific encoding hooks.

EncoderCore exposes the publish primitives a codec encoder delegates to: publishDiscrete, publishDiscreteBatch, startStream, appendStream, closeStream, abortStream, abortAllStreams, and close. The codec wires these into the StreamEncoder methods it returns from createEncoder.

createDecoderCore

function createDecoderCore<TEvent, TMessage>(hooks: DecoderCoreHooks<TEvent, TMessage>, options?: DecoderCoreOptions): DecoderCore<TEvent, TMessage>

Create a base decoder with common message-parsing logic already implemented. The hooks describe how to convert inbound Ably messages into domain events for the target framework.

DecoderCoreHooks has four methods: buildStartEvents(tracker) and buildEndEvents(tracker, closingHeaders) fire at the start and end of each streamed message; buildDeltaEvents(tracker, delta) fires on every appended chunk; decodeDiscrete(payload) handles discrete (non-streaming) messages such as user messages and lifecycle events. Each hook returns an array of DecoderOutput so a single inbound message can produce zero or more domain outputs.

createLifecycleTracker

function createLifecycleTracker<TEvent>(phases: PhaseConfig<TEvent>[]): LifecycleTracker<TEvent>

Create a configurable lifecycle tracker that synthesises missing phase events (for example, a synthesised start chunk when a client joins mid-stream). Use inside a custom decoder to keep the event sequence consistent regardless of when the client attached.

Example

The end-to-end shape for writing a custom codec. The encoder composes createEncoderCore and maps each domain event to a publish primitive. The decoder composes createDecoderCore and emits domain outputs from the four lifecycle hooks.

JavaScript

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

import {
  createEncoderCore,
  createDecoderCore,
  type Codec,
} from '@ably/ai-transport';

type MyEvent =
  | { type: 'text-start'; messageId: string }
  | { type: 'text-delta'; messageId: string; text: string }
  | { type: 'finish' };

type MyMessage = { id: string; text: string };

const myCodec: Codec<MyEvent, MyMessage> = {
  createEncoder(channel, options) {
    const core = createEncoderCore(channel, options);

    return {
      // DiscreteEncoder operations — atomic publishes outside the stream.
      writeMessages: (messages, opts) =>
        core.publishDiscreteBatch(messages.map((m) => ({ name: 'message', data: m })), opts),
      writeEvent: (event, opts) => {
        if (event.type !== 'finish') throw new Error('only finish is a discrete event');
        return core.publishDiscrete({ name: 'finish', data: {} }, opts);
      },

      // StreamEncoder operations — lifecycle of an in-progress stream.
      appendEvent: async (event) => {
        if (event.type === 'text-start') {
          await core.startStream(event.messageId, { name: 'text', data: '' });
        } else if (event.type === 'text-delta') {
          core.appendStream(event.messageId, event.text);
        }
      },
      abort: () => core.abortAllStreams(),
      close: () => core.close(),
    };
  },

  createDecoder() {
    return createDecoderCore<MyEvent, MyMessage>({
      buildStartEvents: (tracker) => [
        { kind: 'event', event: { type: 'text-start', messageId: tracker.streamId } },
      ],
      buildDeltaEvents: (tracker, delta) => [
        { kind: 'event', event: { type: 'text-delta', messageId: tracker.streamId, text: delta } },
      ],
      buildEndEvents: () => [
        { kind: 'event', event: { type: 'finish' } },
      ],
      decodeDiscrete: (payload) =>
        payload.name === 'finish'
          ? [{ kind: 'event', event: { type: 'finish' } }]
          : [{ kind: 'message', message: payload.data as MyMessage }],
    });
  },

  createAccumulator() {
    // Build an accumulator that owns the running list of MyMessage values.
    // See the SDK source for the canonical UIMessage accumulator implementation.
  },

  isTerminal(event) {
    return event.type === 'finish';
  },
};

See the codec architecture internals for the encoder and decoder pipelines in depth.