The codec is the bridge between your AI framework and Ably messages. It defines how domain events (such as LLM tokens) are encoded into Ably publish operations and decoded back into events on the client. Implement the Codec interface to integrate any AI framework with AI Transport.
The SDK ships UIMessageCodec for the Vercel AI SDK. For other frameworks, implement a custom codec as described below.
Codec interface
1
2
3
4
5
6
interface Codec<TEvent, TMessage> {
createEncoder(channel: ChannelWriter, options?: EncoderOptions): StreamEncoder<TEvent, TMessage>
createDecoder(): StreamDecoder<TEvent, TMessage>
createAccumulator(): MessageAccumulator<TEvent, TMessage>
isTerminal(event: TEvent): boolean
}| Method | Description |
|---|---|
| createEncoder | Create an encoder that converts events into Ably publish operations. |
| createDecoder | Create a decoder that converts Ably messages back into events. |
| createAccumulator | Create an accumulator that builds complete messages from events. |
| isTerminal | Return true if the event ends a stream (finish, error, abort). |
The two type parameters are:
TEvent: the event type your AI framework produces (for example, a token chunk or tool call delta).TMessage: the complete message type your UI consumes (for example, a full chat message).
StreamEncoder
The StreamEncoder converts a stream of events into Ably publish operations. It is created by the server transport for each turn.
1
2
3
4
5
6
7
interface StreamEncoder<TEvent, TMessage> {
appendEvent(event: TEvent, options?: WriteOptions): Promise<void>
writeMessages(messages: TMessage[], options?: WriteOptions): Promise<void>
writeEvent(event: TEvent, options?: WriteOptions): Promise<void>
abort(reason?: string): Promise<void>
close(): Promise<void>
}| Method | Description |
|---|---|
| appendEvent | Encode and publish a single event as an append to the current message on the channel. |
| writeMessages | Encode and publish one or more complete messages. Used for user messages and other discrete messages. |
| writeEvent | Encode and publish a single event as a standalone message. |
| abort | Signal that the stream was aborted. Publishes an abort marker. Accepts an optional reason string. |
| close | Signal that the stream is complete. Publishes a close marker and releases resources. |
DiscreteEncoder
A simplified encoder for publishing complete messages without streaming. Used internally for addMessages and addEvents.
1
2
3
4
interface DiscreteEncoder<TEvent, TMessage> {
writeMessages(messages: TMessage[], options?: WriteOptions): Promise<Ably.PublishResult>
writeEvent(event: TEvent, options?: WriteOptions): Promise<Ably.PublishResult>
}StreamDecoder
The StreamDecoder converts inbound Ably messages back into domain events. It is created by the client transport when subscribing to a channel.
1
2
3
interface StreamDecoder<TEvent, TMessage> {
decode(message: Ably.InboundMessage): DecoderOutput<TEvent, TMessage>[]
}| Method | Description |
|---|---|
| decode | Decode an Ably message into zero or more domain outputs. Returns an empty array if the message is not relevant (for example, a control message handled by the transport). |
MessageAccumulator
The MessageAccumulator builds complete messages from a stream of events. The client transport uses it to reconstruct messages from history and to maintain the current message state during streaming.
1
2
3
4
5
6
7
8
9
interface MessageAccumulator<TEvent, TMessage> {
processOutputs(outputs: DecoderOutput<TEvent, TMessage>[]): void
updateMessage(message: TMessage): void
seedMessages(messages: { messageId: string; message: TMessage }[]): void
completeSeeded(messageId: string): void
messages: TMessage[]
completedMessages: TMessage[]
hasActiveStream: boolean
}| Property / Method | Type | Description |
|---|---|---|
| processOutputs | (outputs: DecoderOutput<TEvent, TMessage>[]) => void | Process a batch of decoder outputs and update the internal state. |
| messages | TMessage[] | All messages including the in-progress message being streamed. |
| completedMessages | TMessage[] | Only messages that are fully complete (stream has ended). |
| hasActiveStream | boolean | Whether the accumulator has an in-progress stream. |
ChannelWriter
The ChannelWriter interface is passed to the encoder by the transport. It provides methods for publishing to the Ably channel.
1
2
3
4
5
interface ChannelWriter {
publish(message: Ably.Message | Ably.Message[], options?: Ably.PublishOptions): Promise<Ably.PublishResult>
appendMessage(message: Ably.Message, operation?: Ably.MessageOperation, options?: Ably.PublishOptions): Promise<Ably.UpdateDeleteResult>
updateMessage(message: Ably.Message, operation?: Ably.MessageOperation, options?: Ably.PublishOptions): Promise<Ably.UpdateDeleteResult>
}| Method | Description |
|---|---|
| publish | Publish one or more discrete messages to the channel. |
| appendMessage | Append data to an existing message identified by its serial (used for token-by-token streaming). |
| updateMessage | Replace the data of an existing message identified by its serial. |
Factory helpers
The SDK provides factory helpers to simplify building encoder and decoder implementations.
createEncoderCore
Create a base encoder with common publish logic already implemented. Extend it with your framework-specific encoding.
1
2
3
import { createEncoderCore } from '@ably/ai-transport'
function createEncoderCore(writer: ChannelWriter, options?: EncoderCoreOptions): EncoderCorecreateDecoderCore
Create a base decoder with common message parsing already implemented. Extend it with your framework-specific decoding.
1
2
3
import { createDecoderCore } from '@ably/ai-transport'
function createDecoderCore<TEvent, TMessage>(hooks: DecoderCoreHooks<TEvent, TMessage>, options?: DecoderCoreOptions): DecoderCore<TEvent, TMessage>Write a custom codec
To integrate a framework that is not Vercel AI SDK, implement the four parts of the Codec interface:
-
Encoder. Map your framework's stream events to Ably publish operations. Use
appendMessagefor token-by-token streaming,publishfor discrete events, andupdateMessagefor in-place updates. -
Decoder. Parse inbound Ably messages back into your event type. Handle the message name conventions used by your encoder.
-
Accumulator. Maintain a list of messages, appending tokens to the current message as events arrive and finalizing when a terminal event is processed.
-
Terminal detection. Return
truefromisTerminalfor events that signal the end of a stream, such as a finish reason, an error, or an abort.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
const myCodec: Codec<MyEvent, MyMessage> = {
createEncoder(channel) {
return {
async appendEvent(event) {
await channel.appendMessage({ name: event.messageId, data: event.token })
},
async writeMessages(messages) {
for (const msg of messages) {
await channel.publish({ name: 'message', data: msg })
}
},
async writeEvent(event) {
await channel.publish({ name: 'event', data: event })
},
async abort() {
await channel.publish({ name: 'abort', data: {} })
},
async close() {
await channel.publish({ name: 'close', data: {} })
},
}
},
createDecoder() {
return {
decode(message) {
// Parse Ably message into your event type
return message.data as MyEvent
},
}
},
createAccumulator() {
// Return an accumulator that builds messages from events
},
isTerminal(event) {
return event.type === 'finish' || event.type === 'error'
},
}See the codec architecture internals for a deeper look at how the transport uses each codec method.