Codec API

Open in

The codec is the bridge between your AI framework and Ably messages. It defines how domain events (such as LLM tokens) are encoded into Ably publish operations and decoded back into events on the client. Implement the Codec interface to integrate any AI framework with AI Transport.

The SDK ships UIMessageCodec for the Vercel AI SDK. For other frameworks, implement a custom codec as described below.

Codec interface

JavaScript

1

2

3

4

5

6

interface Codec<TEvent, TMessage> {
  createEncoder(channel: ChannelWriter, options?: EncoderOptions): StreamEncoder<TEvent, TMessage>
  createDecoder(): StreamDecoder<TEvent, TMessage>
  createAccumulator(): MessageAccumulator<TEvent, TMessage>
  isTerminal(event: TEvent): boolean
}
MethodDescription
createEncoderCreate an encoder that converts events into Ably publish operations.
createDecoderCreate a decoder that converts Ably messages back into events.
createAccumulatorCreate an accumulator that builds complete messages from events.
isTerminalReturn true if the event ends a stream (finish, error, abort).

The two type parameters are:

  • TEvent: the event type your AI framework produces (for example, a token chunk or tool call delta).
  • TMessage: the complete message type your UI consumes (for example, a full chat message).

StreamEncoder

The StreamEncoder converts a stream of events into Ably publish operations. It is created by the server transport for each turn.

JavaScript

1

2

3

4

5

6

7

interface StreamEncoder<TEvent, TMessage> {
  appendEvent(event: TEvent, options?: WriteOptions): Promise<void>
  writeMessages(messages: TMessage[], options?: WriteOptions): Promise<void>
  writeEvent(event: TEvent, options?: WriteOptions): Promise<void>
  abort(reason?: string): Promise<void>
  close(): Promise<void>
}
MethodDescription
appendEventEncode and publish a single event as an append to the current message on the channel.
writeMessagesEncode and publish one or more complete messages. Used for user messages and other discrete messages.
writeEventEncode and publish a single event as a standalone message.
abortSignal that the stream was aborted. Publishes an abort marker. Accepts an optional reason string.
closeSignal that the stream is complete. Publishes a close marker and releases resources.

DiscreteEncoder

A simplified encoder for publishing complete messages without streaming. Used internally for addMessages and addEvents.

JavaScript

1

2

3

4

interface DiscreteEncoder<TEvent, TMessage> {
  writeMessages(messages: TMessage[], options?: WriteOptions): Promise<Ably.PublishResult>
  writeEvent(event: TEvent, options?: WriteOptions): Promise<Ably.PublishResult>
}

StreamDecoder

The StreamDecoder converts inbound Ably messages back into domain events. It is created by the client transport when subscribing to a channel.

JavaScript

1

2

3

interface StreamDecoder<TEvent, TMessage> {
  decode(message: Ably.InboundMessage): DecoderOutput<TEvent, TMessage>[]
}
MethodDescription
decodeDecode an Ably message into zero or more domain outputs. Returns an empty array if the message is not relevant (for example, a control message handled by the transport).

MessageAccumulator

The MessageAccumulator builds complete messages from a stream of events. The client transport uses it to reconstruct messages from history and to maintain the current message state during streaming.

JavaScript

1

2

3

4

5

6

7

8

9

interface MessageAccumulator<TEvent, TMessage> {
  processOutputs(outputs: DecoderOutput<TEvent, TMessage>[]): void
  updateMessage(message: TMessage): void
  seedMessages(messages: { messageId: string; message: TMessage }[]): void
  completeSeeded(messageId: string): void
  messages: TMessage[]
  completedMessages: TMessage[]
  hasActiveStream: boolean
}
Property / MethodTypeDescription
processOutputs(outputs: DecoderOutput<TEvent, TMessage>[]) => voidProcess a batch of decoder outputs and update the internal state.
messagesTMessage[]All messages including the in-progress message being streamed.
completedMessagesTMessage[]Only messages that are fully complete (stream has ended).
hasActiveStreambooleanWhether the accumulator has an in-progress stream.

ChannelWriter

The ChannelWriter interface is passed to the encoder by the transport. It provides methods for publishing to the Ably channel.

JavaScript

1

2

3

4

5

interface ChannelWriter {
  publish(message: Ably.Message | Ably.Message[], options?: Ably.PublishOptions): Promise<Ably.PublishResult>
  appendMessage(message: Ably.Message, operation?: Ably.MessageOperation, options?: Ably.PublishOptions): Promise<Ably.UpdateDeleteResult>
  updateMessage(message: Ably.Message, operation?: Ably.MessageOperation, options?: Ably.PublishOptions): Promise<Ably.UpdateDeleteResult>
}
MethodDescription
publishPublish one or more discrete messages to the channel.
appendMessageAppend data to an existing message identified by its serial (used for token-by-token streaming).
updateMessageReplace the data of an existing message identified by its serial.

Factory helpers

The SDK provides factory helpers to simplify building encoder and decoder implementations.

createEncoderCore

Create a base encoder with common publish logic already implemented. Extend it with your framework-specific encoding.

JavaScript

1

2

3

import { createEncoderCore } from '@ably/ai-transport'

function createEncoderCore(writer: ChannelWriter, options?: EncoderCoreOptions): EncoderCore

createDecoderCore

Create a base decoder with common message parsing already implemented. Extend it with your framework-specific decoding.

JavaScript

1

2

3

import { createDecoderCore } from '@ably/ai-transport'

function createDecoderCore<TEvent, TMessage>(hooks: DecoderCoreHooks<TEvent, TMessage>, options?: DecoderCoreOptions): DecoderCore<TEvent, TMessage>

Write a custom codec

To integrate a framework that is not Vercel AI SDK, implement the four parts of the Codec interface:

  1. Encoder. Map your framework's stream events to Ably publish operations. Use appendMessage for token-by-token streaming, publish for discrete events, and updateMessage for in-place updates.

  2. Decoder. Parse inbound Ably messages back into your event type. Handle the message name conventions used by your encoder.

  3. Accumulator. Maintain a list of messages, appending tokens to the current message as events arrive and finalizing when a terminal event is processed.

  4. Terminal detection. Return true from isTerminal for events that signal the end of a stream, such as a finish reason, an error, or an abort.

JavaScript

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

const myCodec: Codec<MyEvent, MyMessage> = {
  createEncoder(channel) {
    return {
      async appendEvent(event) {
        await channel.appendMessage({ name: event.messageId, data: event.token })
      },
      async writeMessages(messages) {
        for (const msg of messages) {
          await channel.publish({ name: 'message', data: msg })
        }
      },
      async writeEvent(event) {
        await channel.publish({ name: 'event', data: event })
      },
      async abort() {
        await channel.publish({ name: 'abort', data: {} })
      },
      async close() {
        await channel.publish({ name: 'close', data: {} })
      },
    }
  },
  createDecoder() {
    return {
      decode(message) {
        // Parse Ably message into your event type
        return message.data as MyEvent
      },
    }
  },
  createAccumulator() {
    // Return an accumulator that builds messages from events
  },
  isTerminal(event) {
    return event.type === 'finish' || event.type === 'error'
  },
}

See the codec architecture internals for a deeper look at how the transport uses each codec method.