# Guide: Stream Anthropic responses using the message-per-response pattern
This guide shows you how to stream AI responses from Anthropic's [Messages API](https://docs.anthropic.com/en/api/messages) over Ably using the [message-per-response pattern](https://ably.com/docs/ai-transport/token-streaming/message-per-response.md). Specifically, it appends each response token to a single Ably message, creating a complete AI response that grows incrementally while delivering tokens in realtime.
Using Ably to distribute tokens from the Anthropic SDK enables you to broadcast AI responses to thousands of concurrent subscribers with reliable message delivery and ordering guarantees. This approach stores each complete response as a single message in channel history, making it easy to retrieve conversation history without processing thousands of individual token messages.
## Prerequisites
Node.js 20 or higher is required.
Python 3.8 or higher is required.
Java 8 or higher is required.
Xcode 15 or higher is required.
You also need:
- An Anthropic API key
- An Ably API key
Useful links:
- [Anthropic API documentation](https://docs.anthropic.com/en/api)
- [Token streaming overview](https://ably.com/docs/ai-transport/token-streaming.md)
- [AI Transport overview](https://ably.com/docs/ai-transport.md)
### Agent setup
Create a new Node project for the agent code:
#### Shell
```
mkdir ably-anthropic-agent && cd ably-anthropic-agent
npm init -y
npm install @anthropic-ai/sdk ably
```
Create a new directory and install the required packages:
#### Shell
```
mkdir ably-anthropic-agent && cd ably-anthropic-agent
pip install anthropic ably
```
Create a new project and add the required dependencies.
For Maven, add to your `pom.xml`:
#### Xml
```
com.anthropicanthropic-java2.15.0io.ablyably-java1.6.1
```
For Gradle, add to your `build.gradle`:
#### Text
```
dependencies {
implementation 'com.anthropic:anthropic-java:2.15.0'
implementation 'io.ably:ably-java:1.6.1'
}
```
Export your Anthropic API key to the environment:
#### Shell
```
export ANTHROPIC_API_KEY="your_api_key_here"
```
### Client setup
Create a new Node project for the client code, or use the same project as the agent if both are JavaScript:
#### Shell
```
mkdir ably-anthropic-client && cd ably-anthropic-client
npm init -y
npm install ably
```
Add the Ably SDK to your iOS or macOS project using Swift Package Manager. In Xcode, go to File > Add Package Dependencies and add:
#### Text
```
https://github.com/ably/ably-cocoa
```
Or add it to your `Package.swift`:
#### Client Swift
```
dependencies: [
.package(url: "https://github.com/ably/ably-cocoa", from: "1.2.0")
]
```
Add the Ably Java SDK to your `pom.xml`:
#### Xml
```
io.ablyably-java1.6.1
```
For Gradle, add to your `build.gradle`:
#### Text
```
implementation 'io.ably:ably-java:1.6.1'
```
## Step 1: Enable message appends
Message append functionality requires "Message annotations, updates, deletes and appends" to be enabled in a [channel rule](https://ably.com/docs/channels.md#rules) associated with the channel.
To enable the channel rule:
1. Go to the [Ably dashboard](https://www.ably.com/dashboard) and select your app.
2. Navigate to the "Configuration" > "Rules" section from the left-hand navigation bar.
3. Choose "Add new rule".
4. Enter a channel name or namespace pattern (e.g. `ai` for all channels starting with `ai:`).
5. Select the "Message annotations, updates, deletes and appends" option from the list.
6. Click "Create channel rule".
The examples in this guide use the `ai:` namespace prefix, which assumes you have configured the rule for `ai:*`.
## Step 2: Get a streamed response from Anthropic
Initialize an Anthropic client and use the [Messages API](https://docs.anthropic.com/en/api/messages) to stream model output as a series of events.
In your `ably-anthropic-agent` directory, create a new file called `agent.mjs``agent.py` with the following contents:
In your agent project, create a new file called `Agent.java` with the following contents:
### Agent Javascript
```
import Anthropic from '@anthropic-ai/sdk';
// Initialize Anthropic client
const anthropic = new Anthropic();
// Process each streaming event
async function processEvent(event) {
console.log(JSON.stringify(event));
// This function is updated in the next sections
}
// Create streaming response from Anthropic
async function streamAnthropicResponse(prompt) {
const stream = await anthropic.messages.create({
model: "claude-sonnet-4-5",
max_tokens: 1024,
messages: [{ role: "user", content: prompt }],
stream: true,
});
// Iterate through streaming events
for await (const event of stream) {
await processEvent(event);
}
}
// Usage example
streamAnthropicResponse("Tell me a short joke");
```
### Agent Python
```
import asyncio
import anthropic
# Initialize Anthropic client
client = anthropic.AsyncAnthropic()
# Process each streaming event
async def process_event(event):
print(event)
# This function is updated in the next sections
# Create streaming response from Anthropic
async def stream_anthropic_response(prompt: str):
async with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
) as stream:
async for event in stream:
await process_event(event)
# Usage example
asyncio.run(stream_anthropic_response("Tell me a short joke"))
```
### Agent Java
```
import com.anthropic.client.AnthropicClient;
import com.anthropic.client.okhttp.AnthropicOkHttpClient;
import com.anthropic.core.http.StreamResponse;
import com.anthropic.models.messages.*;
public class Agent {
// Initialize Anthropic client
private static final AnthropicClient client = AnthropicOkHttpClient.fromEnv();
// Process each streaming event
private static void processEvent(RawMessageStreamEvent event) {
System.out.println(event);
// This method is updated in the next sections
}
// Create streaming response from Anthropic
public static void streamAnthropicResponse(String prompt) {
MessageCreateParams params = MessageCreateParams.builder()
.model(Model.CLAUDE_SONNET_4_5)
.maxTokens(1024)
.addUserMessage(prompt)
.build();
try (StreamResponse stream =
client.messages().createStreaming(params)) {
stream.stream().forEach(Agent::processEvent);
}
}
public static void main(String[] args) {
streamAnthropicResponse("Tell me a short joke");
}
}
```
### Understand Anthropic streaming events
Anthropic's Messages API [streams](https://docs.anthropic.com/en/api/messages-streaming) model output as a series of events when you set `stream: true`. Each streamed event includes a `type` property which describes the event type. A complete text response can be constructed from the following event types:
- [`message_start`](https://platform.claude.com/docs/en/build-with-claude/streaming#event-types): Signals the start of a response. Contains a `message` object with an `id` to correlate subsequent events.
- [`content_block_start`](https://platform.claude.com/docs/en/build-with-claude/streaming#event-types): Indicates the start of a new content block. For text responses, the `content_block` will have `type: "text"`; other types may be specified, such as `"thinking"` for internal reasoning tokens. The `index` indicates the position of this item in the message's `content` array.
- [`content_block_delta`](https://platform.claude.com/docs/en/build-with-claude/streaming#content-block-delta-types): Contains a single text delta in the `delta.text` field. If `delta.type === "text_delta"` the delta contains model response text; other types may be specified, such as `"thinking_delta"` for internal reasoning tokens. Use the `index` to correlate deltas relating to a specific content block.
- [`content_block_stop`](https://platform.claude.com/docs/en/build-with-claude/streaming#event-types): Signals completion of a content block. Contains the `index` that identifies the content block.
- [`message_delta`](https://platform.claude.com/docs/en/build-with-claude/streaming#event-types): Contains additional message-level metadata that may be streamed incrementally. Includes a [`delta.stop_reason`](https://platform.claude.com/docs/en/build-with-claude/handling-stop-reasons) which indicates why the model successfully completed its response generation.
- [`message_stop`](https://platform.claude.com/docs/en/build-with-claude/streaming#event-types): Signals the end of the response.
The following example shows the event sequence received when streaming a response:
#### Json
```
// 1. Message starts
{"type":"message_start","message":{"model":"claude-sonnet-4-5-20250929","id":"msg_012zEkenyT6heaYSDvDEDdXm","type":"message","role":"assistant","content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":12,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":0,"ephemeral_1h_input_tokens":0},"output_tokens":1,"service_tier":"standard"}}}
// 2. Content block starts
{"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}
// 3. Text tokens stream in as delta events
{"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Why"}}
{"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" don't scientists trust atoms?\n\nBecause"}}
{"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" they make up everything!"}}
// 4. Content block completes
{"type":"content_block_stop","index":0}
// 5. Message delta (usage stats)
{"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"input_tokens":12,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"output_tokens":17}}
// 6. Message completes
{"type":"message_stop"}
```
## Step 3: Publish streaming tokens to Ably
Publish Anthropic streaming events to Ably using message appends to reliably and scalably distribute them to subscribers.
Each AI response is stored as a single Ably message that grows as tokens are appended.
### Initialize the Ably client
Add the Ably client initialization to your agent file:
#### Agent Javascript
```
import Ably from 'ably';
// Initialize Ably Realtime client
const realtime = new Ably.Realtime({
key: 'your-api-key',
echoMessages: false
});
// Create a channel for publishing streamed AI responses
const channel = realtime.channels.get('ai:your-channel-name');
```
#### Agent Python
```
from ably import AblyRealtime
# Initialize Ably Realtime client
realtime = AblyRealtime(key='your-api-key', transport_params={'echo': 'false'})
# Create a channel for publishing streamed AI responses
channel = realtime.channels.get('ai:your-channel-name')
```
#### Agent Java
```
import io.ably.lib.realtime.AblyRealtime;
import io.ably.lib.realtime.Channel;
import io.ably.lib.types.ClientOptions;
// Initialize Ably Realtime client
ClientOptions options = new ClientOptions("your-api-key");
options.echoMessages = false;
AblyRealtime realtime = new AblyRealtime(options);
// Create a channel for publishing streamed AI responses
Channel channel = realtime.channels.get("ai:your-channel-name");
```
The Ably Realtime client maintains a persistent connection to the Ably service, which allows you to publish tokens at high message rates with low latency.
### Publish initial message and append tokens
When a new response begins, publish an initial message to create it. Ably assigns a [`serial`](https://ably.com/docs/messages.md#properties) identifier to the message. Use this `serial` to append each token to the message as it arrives from the Anthropic model.
Update your agent file to publish the initial message and append tokens:
#### Agent Javascript
```
// Track state across events
let msgSerial = null;
let textBlockIndex = null;
// Process each streaming event and publish to Ably
async function processEvent(event) {
switch (event.type) {
case 'message_start':
// Publish initial empty message when response starts
const result = await channel.publish({
name: 'response',
data: ''
});
// Capture the message serial for appending tokens
msgSerial = result.serials[0];
break;
case 'content_block_start':
// Capture text block index when a text content block is added
if (event.content_block.type === 'text') {
textBlockIndex = event.index;
}
break;
case 'content_block_delta':
// Append tokens from text deltas only
if (event.index === textBlockIndex && event.delta.type === 'text_delta' && msgSerial) {
channel.appendMessage({
serial: msgSerial,
data: event.delta.text
});
}
break;
case 'message_stop':
console.log('Stream completed!');
break;
}
}
```
#### Agent Python
```
from ably.types.message import Message
# Track state across events
msg_serial = None
text_block_index = None
# Process each streaming event and publish to Ably
async def process_event(event):
global msg_serial, text_block_index
if event.type == 'message_start':
# Publish initial empty message when response starts
result = await channel.publish('response', '')
# Capture the message serial for appending tokens
msg_serial = result.serials[0]
elif event.type == 'content_block_start':
# Capture text block index when a text content block is added
if event.content_block.type == 'text':
text_block_index = event.index
elif event.type == 'content_block_delta':
# Append tokens from text deltas only
if (event.index == text_block_index and
hasattr(event.delta, 'text') and
msg_serial):
await channel.append_message(
Message(serial=msg_serial, data=event.delta.text)
)
elif event.type == 'message_stop':
print('Stream completed!')
```
#### Agent Java
```
import io.ably.lib.types.Message;
// Track state across events
private static String msgSerial = null;
private static Long textBlockIndex = null;
// Process each streaming event and publish to Ably
private static void processEvent(RawMessageStreamEvent event) throws AblyException {
if (event.isMessageStart()) {
// Publish initial empty message when response starts
Message message = new Message("response", "");
channel.publish(message, new Callback() {
@Override
public void onSuccess(PublishResult result) {
// Capture the message serial for appending tokens
msgSerial = result.serials[0];
}
@Override
public void onError(ErrorInfo reason) {
System.err.println("Publish failed: " + reason.message);
}
});
} else if (event.isContentBlockStart()) {
// Capture text block index when a text content block is added
RawContentBlockStartEvent blockStart = event.asContentBlockStart();
if (blockStart.contentBlock().isText()) {
textBlockIndex = blockStart.index();
}
} else if (event.isContentBlockDelta()) {
// Append tokens from text deltas only
RawContentBlockDeltaEvent delta = event.asContentBlockDelta();
if (delta.index() == textBlockIndex &&
delta.delta().isText() &&
msgSerial != null) {
String text = delta.delta().asText().text();
Message message = new Message();
message.data = text;
message.serial = msgSerial;
channel.appendMessage(message);
}
} else if (event.isMessageStop()) {
System.out.println("Stream completed!");
}
}
```
This implementation:
- Publishes an initial empty message when the response begins and captures the `serial`
- Filters for `content_block_delta` events with `text_delta` type from text content blocks
- Appends each token to the original message
Run the publisher to see tokens streaming to Ably:
#### Shell
```
cd ably-anthropic-agent
node agent.mjs
```
#### Shell
```
cd ably-anthropic-agent
python agent.py
```
#### Shell
```
mvn compile exec:java -Dexec.mainClass="Agent"
```
## Step 4: Subscribe to streaming tokens
Create a subscriber that receives the streaming tokens from Ably and reconstructs the response in realtime.
In your `ably-anthropic-client`client project directory, create a new file called `client.mjs``Client.java` with the following contents:
Add the following code to your iOS or macOS app:
### Client Javascript
```
import Ably from 'ably';
// Initialize Ably Realtime client
const realtime = new Ably.Realtime({ key: 'your-api-key' });
// Get the same channel used by the publisher
const channel = realtime.channels.get('ai:your-channel-name');
// Track responses by message serial
const responses = new Map();
// Subscribe to receive messages
await channel.subscribe((message) => {
switch (message.action) {
case 'message.create':
// New response started
console.log('\n[Response started]', message.serial);
responses.set(message.serial, message.data);
break;
case 'message.append':
// Append token to existing response
const current = responses.get(message.serial) || '';
responses.set(message.serial, current + message.data);
// Display token as it arrives
process.stdout.write(message.data);
break;
case 'message.update':
// Replace entire response content
responses.set(message.serial, message.data);
console.log('\n[Response updated with full content]');
break;
}
});
console.log('Subscriber ready, waiting for tokens...');
```
### Client Swift
```
import Ably
// Initialize Ably Realtime client
let realtime = ARTRealtime(key: "your-api-key")
// Get the same channel used by the publisher
let channel = realtime.channels.get("ai:your-channel-name")
// Track responses by message serial
var responses: [String: String] = [:]
// Subscribe to receive messages and wait for the channel to attach
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in
channel.subscribe(attachCallback: { error in
if let error {
continuation.resume(throwing: error)
} else {
continuation.resume()
}
}, callback: { message in
MainActor.assumeIsolated {
guard let serial = message.serial else { return }
guard let data = message.data as? String else { return }
switch message.action {
case .create:
// New response started
print("\n[Response started] \(serial)")
responses[serial] = data
case .append:
// Append token to existing response
let current = responses[serial] ?? ""
responses[serial] = current + data
// Display token as it arrives
print(data, terminator: "")
case .update:
// Replace entire response content
responses[serial] = data
print("\n[Response updated with full content]")
default:
break
}
}
})
}
print("Subscriber ready, waiting for tokens...")
```
### Client Java
```
import io.ably.lib.realtime.AblyRealtime;
import io.ably.lib.realtime.Channel;
import io.ably.lib.types.ClientOptions;
import io.ably.lib.types.Message;
import java.util.HashMap;
import java.util.Map;
public class Client {
// Track responses by message serial
private static final Map responses = new HashMap<>();
public static void main(String[] args) throws Exception {
// Initialize Ably Realtime client
ClientOptions options = new ClientOptions("your-api-key");
AblyRealtime realtime = new AblyRealtime(options);
// Get the same channel used by the publisher
Channel channel = realtime.channels.get("ai:your-channel-name");
// Subscribe to receive messages
channel.subscribe(message -> {
String serial = message.serial;
if (serial == null) return;
switch (message.action) {
case MESSAGE_CREATE:
// New response started
System.out.println("\n[Response started] " + serial);
responses.put(serial, message.data != null ? message.data.toString() : "");
break;
case MESSAGE_APPEND:
// Append token to existing response
String current = responses.getOrDefault(serial, "");
String token = message.data != null ? message.data.toString() : "";
responses.put(serial, current + token);
// Display token as it arrives
System.out.print(token);
break;
case MESSAGE_UPDATE:
// Replace entire response content
responses.put(serial, message.data != null ? message.data.toString() : "");
System.out.println("\n[Response updated with full content]");
break;
}
});
System.out.println("Subscriber ready, waiting for tokens...");
}
}
```
Subscribers receive different message actions depending on when they join and how they're retrieving messages:
- `message.create`: Indicates a new response has started (i.e. a new message was created). The message `data` contains the initial content (often empty or the first token). Store this as the beginning of a new response using `serial` as the identifier.
- `message.append`: Contains a single token fragment to append. The message `data` contains only the new token, not the full concatenated response. Append this token to the existing response identified by `serial`.
- `message.update`: Contains the whole response up to that point. The message `data` contains the full concatenated text so far. Replace the entire response content with this data for the message identified by `serial`. This action occurs when the channel needs to resynchronize the full message state, such as after a client [resumes](https://ably.com/docs/connect/states.md#resume) from a transient disconnection.
Run the subscriber in a separate terminal:
### Shell
```
cd ably-anthropic-client
node client.mjs
```
Build and run your iOS or macOS app in Xcode.
### Shell
```
mvn compile exec:java -Dexec.mainClass="Client"
```
With the subscriber running, run the publisher in another terminal. The tokens stream in realtime as the Anthropic model generates them.
With the subscriber running, run the publisher in a terminal. The tokens stream in realtime as the Anthropic model generates them.
## Step 5: Stream with multiple publishers and subscribers
Ably's [channel-oriented sessions](https://ably.com/docs/ai-transport/sessions-identity.md#connection-oriented-vs-channel-oriented-sessions) enables multiple AI agents to publish responses and multiple users to receive them on a single channel simultaneously. Ably handles message delivery to all participants, eliminating the need to implement routing logic or manage state synchronization across connections.
### Broadcasting to multiple subscribers
Each subscriber receives the complete stream of tokens independently, enabling you to build collaborative experiences or multi-device applications.
Run a subscriber in multiple separate terminals:
#### Shell
```
# Terminal 1
cd ably-anthropic-client && node client.mjs
# Terminal 2
cd ably-anthropic-client && node client.mjs
# Terminal 3
cd ably-anthropic-client && node client.mjs
```
#### Shell
```
# Terminal 1
mvn compile exec:java -Dexec.mainClass="Client"
# Terminal 2
mvn compile exec:java -Dexec.mainClass="Client"
# Terminal 3
mvn compile exec:java -Dexec.mainClass="Client"
```
Run multiple instances of your iOS or macOS app, or run on multiple devices/simulators.
All subscribers receive the same stream of tokens in realtime.
### Publishing concurrent responses
Multiple publishers can stream different responses concurrently on the same [channel](https://ably.com/docs/channels.md). Each response is a distinct message with its own unique `serial` identifier, so tokens from different responses are isolated to distinct messages and don't interfere with each other.
To demonstrate this, run a publisher in multiple separate terminals:
#### Shell
```
# Terminal 1
cd ably-anthropic-agent && node agent.mjs
# Terminal 2
cd ably-anthropic-agent && node agent.mjs
# Terminal 3
cd ably-anthropic-agent && node agent.mjs
```
#### Shell
```
# Terminal 1
cd ably-anthropic-agent && python agent.py
# Terminal 2
cd ably-anthropic-agent && python agent.py
# Terminal 3
cd ably-anthropic-agent && python agent.py
```
#### Shell
```
# Terminal 1
mvn compile exec:java -Dexec.mainClass="Agent"
# Terminal 2
mvn compile exec:java -Dexec.mainClass="Agent"
# Terminal 3
mvn compile exec:java -Dexec.mainClass="Agent"
```
All running subscribers receive tokens from all responses concurrently. Each subscriber correctly reconstructs each response separately using the `serial` to correlate tokens.
## Step 6: Retrieve complete responses from history
One key advantage of the message-per-response pattern is that each complete AI response is stored as a single message in channel history. This makes it efficient to retrieve conversation history without processing thousands of individual token messages.
Use Ably's [rewind](https://ably.com/docs/channels/options/rewind.md) channel option to attach to the channel at some point in the recent past and automatically receive complete responses from history. Historical messages are delivered as `message.update` events containing the complete concatenated response, which then seamlessly transition to live `message.append` events for any ongoing responses.
Update your `client.mjs` file in the `ably-anthropic-client` directory to use the `rewind` option when getting the channel:
Update your subscriber code to use the `rewind` option when getting the channel:
Update your `Client.java` file to use the `rewind` option when getting the channel:
### Client Javascript
```
// Use rewind to receive recent historical messages
const channel = realtime.channels.get('ai:your-channel-name', {
params: { rewind: '2m' } // Retrieve messages from the last 2 minutes
});
const responses = new Map();
await channel.subscribe((message) => {
switch (message.action) {
case 'message.create':
responses.set(message.serial, message.data);
break;
case 'message.append':
const current = responses.get(message.serial) || '';
responses.set(message.serial, current + message.data);
process.stdout.write(message.data);
break;
case 'message.update':
// Historical messages contain full concatenated response
responses.set(message.serial, message.data);
console.log('\n[Historical response]:', message.data);
break;
}
});
```
### Client Swift
```
// Use rewind to receive recent historical messages
let channelOptions = ARTRealtimeChannelOptions()
channelOptions.params = ["rewind": "2m"] // Retrieve messages from the last 2 minutes
let channel = realtime.channels.get("ai:your-channel-name", options: channelOptions)
var responses: [String: String] = [:]
// Subscribe and wait for the channel to attach
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in
channel.subscribe(attachCallback: { error in
if let error {
continuation.resume(throwing: error)
} else {
continuation.resume()
}
}, callback: { message in
MainActor.assumeIsolated {
guard let serial = message.serial else { return }
guard let data = message.data as? String else { return }
switch message.action {
case .create:
responses[serial] = data
case .append:
let current = responses[serial] ?? ""
responses[serial] = current + data
print(data, terminator: "")
case .update:
// Historical messages contain full concatenated response
responses[serial] = data
print("\n[Historical response]: \(responses[serial] ?? "")")
default:
break
}
}
})
}
```
### Client Java
```
import io.ably.lib.realtime.AblyRealtime;
import io.ably.lib.realtime.Channel;
import io.ably.lib.types.ClientOptions;
import io.ably.lib.types.ChannelOptions;
import java.util.HashMap;
import java.util.Map;
// Use rewind to receive recent historical messages
ClientOptions clientOptions = new ClientOptions("your-api-key");
AblyRealtime realtime = new AblyRealtime(clientOptions);
ChannelOptions channelOptions = new ChannelOptions();
Map params = new HashMap<>();
params.put("rewind", "2m"); // Retrieve messages from the last 2 minutes
channelOptions.params = params;
Channel channel = realtime.channels.get("ai:your-channel-name", channelOptions);
Map responses = new HashMap<>();
channel.subscribe(message -> {
String serial = message.serial;
if (serial == null) return;
switch (message.action) {
case MESSAGE_CREATE:
responses.put(serial, message.data != null ? message.data.toString() : "");
break;
case MESSAGE_APPEND:
String current = responses.getOrDefault(serial, "");
String token = message.data != null ? message.data.toString() : "";
responses.put(serial, current + token);
System.out.print(token);
break;
case MESSAGE_UPDATE:
// Historical messages contain full concatenated response
responses.put(serial, message.data != null ? message.data.toString() : "");
System.out.println("\n[Historical response]: " + responses.get(serial));
break;
}
});
```
## Next steps
- Learn more about the [message-per-response pattern](https://ably.com/docs/ai-transport/token-streaming/message-per-response.md) used in this guide
- Learn about [client hydration strategies](https://ably.com/docs/ai-transport/token-streaming/message-per-response.md#hydration) for handling late joiners and reconnections
- Understand [sessions and identity](https://ably.com/docs/ai-transport/sessions-identity.md) in AI enabled applications
- Explore the [message-per-token pattern](https://ably.com/docs/ai-transport/token-streaming/message-per-token.md) for explicit control over individual token messages
## Related Topics
- [Message per token](https://ably.com/docs/ai-transport/guides/anthropic/anthropic-message-per-token.md): Stream tokens from the Anthropic Messages API over Ably in realtime.
- [Human-in-the-loop](https://ably.com/docs/ai-transport/guides/anthropic/anthropic-human-in-the-loop.md): Implement human approval workflows for AI agent tool calls using Anthropic and Ably with role-based access control.
- [Citations](https://ably.com/docs/ai-transport/guides/anthropic/anthropic-citations.md): Attach source citations to AI responses from the Anthropic Messages API using Ably message annotations.
## Documentation Index
To discover additional Ably documentation:
1. Fetch [llms.txt](https://ably.com/llms.txt) for the canonical list of available pages.
2. Identify relevant URLs from that index.
3. Fetch target pages as needed.
Avoid using assumed or outdated documentation paths.