Codec architecture

Open in

The codec is the layer that translates between your AI framework's event types and Ably channel messages. This page describes how each part of the codec works internally and how the pieces fit together.

Two-layer split

AI Transport separates concerns into two layers:

  • Transport layer (generic): manages turns, lifecycle events, cancellation, history, and multi-client sync. It works with any message type.
  • Domain layer (codec): maps framework-specific events to Ably publish operations and back. It knows the shape of your events and messages.

The transport layer calls into the codec but never inspects the domain data. The codec calls into the channel writer but never manages turns or lifecycle. This separation is what makes AI Transport framework-agnostic.

Encoder core

The encoder converts outbound domain events into Ably publish operations. It supports two modes.

Discrete mode

Discrete mode publishes a complete message in a single operation. The encoder calls channel.publish() once with the full message content. User messages and tool results typically use discrete mode.

Streamed mode

Streamed mode publishes a message incrementally. The encoder creates a message, appends tokens to it, and then closes it. This is the mode used for LLM response streaming.

The append pipeline is fire-and-forget: each appendMessage call is dispatched without waiting for acknowledgment. This keeps the stream flowing at the rate the LLM produces tokens, without back-pressure from the network.

Flush and recovery

If an append fails (for example, due to a transient network error), the encoder falls back to an updateMessage call that sends the full accumulated content. This is the recovery path. The encoder maintains a buffer of all appended content so it can always reconstruct the complete message.

After recovery, the encoder continues appending from the recovered state. Subscribers see a seamless stream regardless of whether individual appends succeeded.

Header merging

Every outbound message carries both transport and domain headers. The encoder merges them with this priority order:

  1. Transport headers set by the transport layer (turn ID, role, stream status).
  2. Domain headers set by the codec (framework-specific metadata).
  3. Per-message headers set by the caller (overrides for specific messages).

Higher-priority headers overwrite lower-priority ones. This ensures transport-critical headers like x-ably-turn-id cannot be accidentally overridden by the codec.

Decoder core

The decoder converts inbound Ably messages back into domain events. It dispatches on the message action to determine how to process each message.

Action dispatch

The decoder handles four Ably message actions:

ActionMeaningDecoder behavior
createA new message arrivedStart a new message or stream. Emit the initial event.
appendA token was appended to a messageEmit a delta event for the matching stream.
updateA message was updated in placeUpdate the message state. If the status is terminal, close the stream.
deleteA message was removedRemove the message from state.

Stream tracker

The decoder maintains a stream tracker that maps stream IDs to their current state. When a create action arrives with x-ably-stream: true, the tracker opens a new stream. Subsequent append actions for the same stream ID are routed to that stream. An update action with a terminal status closes the stream.

First-contact vs prefix-match

When a client joins a channel, the first message it receives for a given stream may not be the create action - the stream may already be in progress. The decoder handles this with two strategies:

  • First-contact: if the first action for a stream is create, the decoder processes it normally.
  • Prefix-match: if the first action for a stream is append or update, the decoder recognizes this as a mid-stream join. It initializes the stream from the accumulated message state on the channel rather than waiting for a create that already happened.

Accumulator

The accumulator builds complete messages from a sequence of domain events. It is used in two contexts:

Live stream

During live streaming, the accumulator processes events as they arrive from the decoder. It maintains a running list of messages, appending tokens to the in-progress message. The messages property returns all messages including the one currently being streamed. The completedMessages property returns only messages where the stream has ended.

History

When loading history, the accumulator replays stored events to reconstruct message state. The processOutputs method accepts a batch of events and updates the internal state. This is the same method used during live streaming - the accumulator does not distinguish between live and historical events.

JavaScript

1

2

3

4

5

6

7

8

9

const accumulator = codec.createAccumulator()

// Process events from history or live stream
accumulator.processOutputs(events)

// Read current state
accumulator.messages          // all messages (including in-progress)
accumulator.completedMessages // only completed messages
accumulator.hasActiveStream   // true if a stream is in progress

Lifecycle tracker

The lifecycle tracker ensures clients receive a consistent sequence of lifecycle events, even when joining mid-stream. If a client subscribes to a channel where a turn is already in progress, the tracker synthesizes the missing lifecycle events (like turn-start and the stream create) so the client can process the in-progress turn correctly.

The tracker maintains configurable phases. Each phase defines a set of events that must be observed before the next phase begins. If the tracker detects a gap (for example, the client received an append without seeing a stream create), it synthesizes the missing events and delivers them in order before the real event.

Write a custom codec

To build a codec for a framework not covered by the SDK:

  1. Implement the Codec interface with your event and message types.
  2. Use createEncoderCore and createDecoderCore from @ably/ai-transport to get the base publish and dispatch logic.
  3. Provide hooks that map your framework's events to the core operations (create, append, update, close).
  4. Implement isTerminal to identify events that end a stream.
JavaScript

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

import { createEncoderCore, createDecoderCore } from '@ably/ai-transport'

const myCodec: Codec<MyEvent, MyMessage> = {
  createEncoder(channel, options) {
    const core = createEncoderCore(channel, options)
    return {
      async appendEvent(event) {
        await core.appendStream(event.messageId, serialize(event))
      },
      async writeMessages(messages, opts) {
        await core.publishDiscreteBatch(messages.map((msg) => serialize(msg)), opts)
      },
      async writeEvent(event) {
        await core.publishDiscrete(serialize(event))
      },
      async abort() { await core.abortAllStreams() },
      async close() { await core.close() },
    }
  },
  createDecoder() {
    const core = createDecoderCore({
      onCreate: (msg) => deserialize(msg.data),
      onAppend: (msg) => deserialize(msg.data),
      onUpdate: (msg) => deserialize(msg.data),
    })
    return {
      decode(message) {
        return core.decode(message)
      },
    }
  },
  createAccumulator() {
    // Build and return a MessageAccumulator for your types
  },
  isTerminal(event) {
    return event.type === 'finish' || event.type === 'error'
  },
}

See the Codec API reference for the full interface specification.