Codec architecture

Internal architecture of the codec: encoder, decoder, accumulator, and lifecycle tracker. How each piece fits into the publish-side and subscribe-side pipelines.

The codec is the layer that translates between your AI framework's event types and Ably channel messages. This page describes how each part of the codec works internally and how the pieces fit together.

The two pipelines mirror each other:

Publish side (server): framework events v StreamEncoder.appendEvent / writeMessages v ChannelWriter.publish / appendMessage / updateMessage v Ably channel Subscribe side (client): Ably channel v StreamDecoder.decode -> DecoderOutput[] v MessageAccumulator.processOutputs v framework messages v View
Copied!

Two-layer split

AI Transport separates concerns into two layers:

  • Transport layer (generic): manages turns, lifecycle events, cancellation, history, and multi-client sync. It works with any message type.
  • Domain layer (codec): maps framework-specific events to Ably publish operations and back. It knows the shape of your events and messages.

The transport layer calls into the codec but never inspects the domain data. The codec calls into the channel writer but never manages turns or lifecycle. This separation is what makes AI Transport framework-agnostic.

Encoder core

The encoder converts outbound domain events into Ably publish operations. It supports two modes.

Discrete mode

Discrete mode publishes a complete message in a single operation. The encoder calls channel.publish() once with the full message content. User messages and tool results typically use discrete mode.

Streamed mode

Streamed mode publishes a message incrementally. startStream creates the message with channel.publish() and captures the returned serial. Each subsequent appendStream call calls channel.appendMessage() with that serial. closeStream (or cancelStream) sends a terminal append with x-ably-status: finished or x-ably-status: cancelled. This is the mode used for LLM response streaming.

Awaiting append promises

Per-token append calls do not block the encoder. Awaiting each one would couple the stream rate to the network round-trip. Instead the encoder collects every append promise and awaits them as a batch when the stream closes or cancels.

This batched await is what catches failed appends. If any are rejected, the encoder replaces the message in place with the full accumulated content, so subscribers see the intended final state even when intermediate appends were lost. Concurrent closes serialise through a single flush, so closing several streams at once does not race.

Header composition

Every outbound message carries both transport and domain headers. Append operations overwrite all the headers from previous operations on the same message, preferring the latest write.

Decoder core

The decoder converts inbound Ably messages back into domain events. It dispatches on the message action to determine how to process each message.

Action dispatch

The decoder handles four Ably message actions:

ActionMeaningDecoder behavior
createA new message arrivedStart a new message or stream. Emit the initial event.
appendA token was appended to a messageEmit a delta event for the matching stream.
updateA message was updated in placeUpdate the message state. If the status is terminal, close the stream.
deleteA message was removedRemove the message from state.

Stream tracker

The decoder maintains a stream tracker that maps stream IDs to their current state. When a create action arrives with x-ably-stream: true, the tracker opens a new stream. Subsequent append actions for the same stream ID are routed to that stream. An update action with a terminal status closes the stream.

Joining a stream mid-flight

A client that joins a channel while a streamed message is already in progress sees an append or update for a stream it has never seen the original create for. The decoder treats the incoming content as the current full state of that stream and starts tracking it from there. Later appends are accumulated on top in the usual way. The client ends up in the same state as one that received every operation from the start.

Accumulator

The accumulator builds complete messages from a sequence of domain events. It is used in two contexts:

Live stream

During live streaming, the accumulator processes events as they arrive from the decoder. It maintains a running list of messages, appending tokens to the in-progress message. The messages property returns all messages including the one currently being streamed. The completedMessages property returns only messages where the stream has ended.

History

When loading history, the accumulator replays stored events to reconstruct message state. The processOutputs method accepts a batch of events and updates the internal state. This is the same method used during live streaming - the accumulator does not distinguish between live and historical events.

JavaScript

1

2

3

4

5

6

7

8

9

const accumulator = codec.createAccumulator()

// Process events from history or live stream
accumulator.processOutputs(events)

// Read current state
accumulator.messages          // all messages (including in-progress)
accumulator.completedMessages // only completed messages
accumulator.hasActiveStream   // true if a stream is in progress

Lifecycle tracker

The lifecycle tracker ensures clients receive a consistent sequence of lifecycle events, even when joining mid-stream. If a client subscribes to a channel where a turn is already in progress, the tracker synthesizes the missing lifecycle events (like turn-start and the stream create) so the client can process the in-progress turn correctly.

The tracker maintains configurable phases. Each phase defines a set of events that must be observed before the next phase begins. If the tracker detects a gap (for example, the client received an append without seeing a stream create), it synthesizes the missing events and delivers them in order before the real event.

Write a custom codec

To build a codec for a framework not covered by the SDK:

  1. Implement the Codec interface with your event and message types.
  2. Use createEncoderCore and createDecoderCore from @ably/ai-transport to get the base publish and dispatch logic.
  3. Provide hooks that map your framework's events to the core operations (create, append, update, close).
  4. Implement isTerminal to identify events that end a stream.
JavaScript

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

import { createEncoderCore, createDecoderCore } from '@ably/ai-transport'

const myCodec: Codec<MyEvent, MyMessage> = {
  createEncoder(channel, options) {
    const core = createEncoderCore(channel, options)
    return {
      async appendEvent(event) {
        await core.appendStream(event.messageId, serialize(event))
      },
      async writeMessages(messages, opts) {
        await core.publishDiscreteBatch(messages.map((msg) => serialize(msg)), opts)
      },
      async writeEvent(event) {
        await core.publishDiscrete(serialize(event))
      },
      async abort() { await core.abortAllStreams() },
      async close() { await core.close() },
    }
  },
  createDecoder() {
    const core = createDecoderCore({
      onCreate: (msg) => deserialize(msg.data),
      onAppend: (msg) => deserialize(msg.data),
      onUpdate: (msg) => deserialize(msg.data),
    })
    return {
      decode(message) {
        return core.decode(message)
      },
    }
  },
  createAccumulator() {
    // Build and return a MessageAccumulator for your types
  },
  isTerminal(event) {
    return event.type === 'finish' || event.type === 'error'
  },
}

See the Codec API reference for the full interface specification.