# Guide: Stream Anthropic responses using the message-per-token pattern This guide shows you how to stream AI responses from Anthropic's [Messages API](https://docs.anthropic.com/en/api/messages) over Ably using the [message-per-token pattern](https://ably.com/docs/ai-transport/token-streaming/message-per-token.md). Specifically, it implements the [explicit start/stop events approach](https://ably.com/docs/ai-transport/token-streaming/message-per-token.md#explicit-events), which publishes each response token as an individual message, along with explicit lifecycle events to signal when responses begin and end. Using Ably to distribute tokens from the Anthropic SDK enables you to broadcast AI responses to thousands of concurrent subscribers with reliable message delivery and ordering guarantees, ensuring that each client receives the complete response stream with all tokens delivered in order. This approach decouples your AI inference from client connections, enabling you to scale agents independently and handle reconnections gracefully. ## Prerequisites Node.js 20 or higher is required. Python 3.8 or higher is required. Java 8 or higher is required. Xcode 15 or higher is required. You also need: - An Anthropic API key - An Ably API key Useful links: - [Anthropic API documentation](https://docs.anthropic.com/en/api) - [Token streaming overview](https://ably.com/docs/ai-transport/token-streaming.md) - [AI Transport overview](https://ably.com/docs/ai-transport.md) ### Agent setup Create a new Node project for the agent code: #### Shell ``` mkdir ably-anthropic-agent && cd ably-anthropic-agent npm init -y npm install @anthropic-ai/sdk ably ``` Create a new directory and install the required packages: #### Shell ``` mkdir ably-anthropic-agent && cd ably-anthropic-agent pip install anthropic ably ``` Create a new project and add the required dependencies. For Maven, add to your `pom.xml`: #### Xml ``` com.anthropic anthropic-java 2.15.0 io.ably ably-java 1.6.1 ``` For Gradle, add to your `build.gradle`: #### Text ``` dependencies { implementation 'com.anthropic:anthropic-java:2.15.0' implementation 'io.ably:ably-java:1.6.1' } ``` Export your Anthropic API key to the environment: #### Shell ``` export ANTHROPIC_API_KEY="your_api_key_here" ``` ### Client setup Create a new Node project for the client code, or use the same project as the agent if both are JavaScript: #### Shell ``` mkdir ably-anthropic-client && cd ably-anthropic-client npm init -y npm install ably ``` Add the Ably SDK to your iOS or macOS project using Swift Package Manager. In Xcode, go to File > Add Package Dependencies and add: #### Text ``` https://github.com/ably/ably-cocoa ``` Or add it to your `Package.swift`: #### Client Swift ``` dependencies: [ .package(url: "https://github.com/ably/ably-cocoa", from: "1.2.0") ] ``` Add the Ably Java SDK to your `pom.xml`: #### Xml ``` io.ably ably-java 1.6.1 ``` For Gradle, add to your `build.gradle`: #### Text ``` implementation 'io.ably:ably-java:1.6.1' ``` ## Step 1: Get a streamed response from Anthropic Initialize an Anthropic client and use the [Messages API](https://docs.anthropic.com/en/api/messages) to stream model output as a series of events. In your `ably-anthropic-agent` directory, create a new file called `agent.mjs``agent.py` with the following contents: In your agent project, create a new file called `Agent.java` with the following contents: ### Agent Javascript ``` import Anthropic from '@anthropic-ai/sdk'; // Initialize Anthropic client const anthropic = new Anthropic(); // Process each streaming event function processEvent(event) { console.log(JSON.stringify(event)); // This function is updated in the next sections } // Create streaming response from Anthropic async function streamAnthropicResponse(prompt) { const stream = await anthropic.messages.create({ model: "claude-sonnet-4-5", max_tokens: 1024, messages: [{ role: "user", content: prompt }], stream: true, }); // Iterate through streaming events for await (const event of stream) { processEvent(event); } } // Usage example streamAnthropicResponse("Tell me a short joke"); ``` ### Agent Python ``` import asyncio import anthropic # Initialize Anthropic client client = anthropic.AsyncAnthropic() # Process each streaming event async def process_event(event): print(event) # This function is updated in the next sections # Create streaming response from Anthropic async def stream_anthropic_response(prompt: str): async with client.messages.stream( model="claude-sonnet-4-5", max_tokens=1024, messages=[{"role": "user", "content": prompt}], ) as stream: async for event in stream: await process_event(event) # Usage example asyncio.run(stream_anthropic_response("Tell me a short joke")) ``` ### Agent Java ``` import com.anthropic.client.AnthropicClient; import com.anthropic.client.okhttp.AnthropicOkHttpClient; import com.anthropic.core.http.StreamResponse; import com.anthropic.models.messages.*; public class Agent { // Initialize Anthropic client private static final AnthropicClient client = AnthropicOkHttpClient.fromEnv(); // Process each streaming event private static void processEvent(RawMessageStreamEvent event) { System.out.println(event); // This method is updated in the next sections } // Create streaming response from Anthropic public static void streamAnthropicResponse(String prompt) { MessageCreateParams params = MessageCreateParams.builder() .model(Model.CLAUDE_SONNET_4_5) .maxTokens(1024) .addUserMessage(prompt) .build(); try (StreamResponse stream = client.messages().createStreaming(params)) { stream.stream().forEach(Agent::processEvent); } } public static void main(String[] args) { streamAnthropicResponse("Tell me a short joke"); } } ``` ### Understand Anthropic streaming events Anthropic's Messages API [streams](https://docs.anthropic.com/en/api/messages-streaming) model output as a series of events when you set `stream: true`. Each streamed event includes a `type` property which describes the event type. A complete text response can be constructed from the following event types: - [`message_start`](https://platform.claude.com/docs/en/build-with-claude/streaming#event-types): Signals the start of a response. Contains a `message` object with an `id` to correlate subsequent events. - [`content_block_start`](https://platform.claude.com/docs/en/build-with-claude/streaming#event-types): Indicates the start of a new content block. For text responses, the `content_block` will have `type: "text"`; other types may be specified, such as `"thinking"` for internal reasoning tokens. The `index` indicates the position of this item in the message's `content` array. - [`content_block_delta`](https://platform.claude.com/docs/en/build-with-claude/streaming#content-block-delta-types): Contains a single text delta in the `delta.text` field. If `delta.type === "text_delta"` the delta contains model response text; other types may be specified, such as `"thinking_delta"` for internal reasoning tokens. Use the `index` to correlate deltas relating to a specific content block. - [`content_block_stop`](https://platform.claude.com/docs/en/build-with-claude/streaming#event-types): Signals completion of a content block. Contains the `index` that identifies the content block. - [`message_delta`](https://platform.claude.com/docs/en/build-with-claude/streaming#event-types): Contains additional message-level metadata that may be streamed incrementally. Includes a [`delta.stop_reason`](https://platform.claude.com/docs/en/build-with-claude/handling-stop-reasons) which indicates why the model successfully completed its response generation. - [`message_stop`](https://platform.claude.com/docs/en/build-with-claude/streaming#event-types): Signals the end of the response. The following example shows the event sequence received when streaming a response: #### Json ``` // 1. Message starts {"type":"message_start","message":{"model":"claude-sonnet-4-5-20250929","id":"msg_016hhjrqVK4rCZ2uEGdyWfmt","type":"message","role":"assistant","content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":12,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":0,"ephemeral_1h_input_tokens":0},"output_tokens":1,"service_tier":"standard"}}} // 2. Content block starts {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}} // 3. Text tokens stream in as delta events {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Why"}} {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" don't scientists trust atoms?\n\nBecause"}} {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" they make up everything!"}} // 4. Content block completes {"type":"content_block_stop","index":0} // 5. Message delta (usage stats) {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"input_tokens":12,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"output_tokens":17}} // 6. Message completes {"type":"message_stop"} ``` ## Step 2: Publish streaming events to Ably Publish Anthropic streaming events to Ably to reliably and scalably distribute them to subscribers. This implementation follows the [explicit start/stop events pattern](https://ably.com/docs/ai-transport/token-streaming/message-per-token.md#explicit-events), which provides clear response boundaries. ### Initialize the Ably client Add the Ably client initialization to your agent file: #### Agent Javascript ``` import Ably from 'ably'; // Initialize Ably Realtime client const realtime = new Ably.Realtime({ key: 'your-api-key', echoMessages: false }); // Create a channel for publishing streamed AI responses const channel = realtime.channels.get('your-channel-name'); ``` #### Agent Python ``` from ably import AblyRealtime # Initialize Ably Realtime client realtime = AblyRealtime(key='your-api-key', transport_params={'echo': 'false'}) # Create a channel for publishing streamed AI responses channel = realtime.channels.get('your-channel-name') ``` #### Agent Java ``` import io.ably.lib.realtime.AblyRealtime; import io.ably.lib.realtime.Channel; import io.ably.lib.types.ClientOptions; // Initialize Ably Realtime client ClientOptions options = new ClientOptions("your-api-key"); options.echoMessages = false; AblyRealtime realtime = new AblyRealtime(options); // Create a channel for publishing streamed AI responses Channel channel = realtime.channels.get("your-channel-name"); ``` The Ably Realtime client maintains a persistent connection to the Ably service, which allows you to publish tokens at high message rates with low latency. ### Map Anthropic streaming events to Ably messages Choose how to map [Anthropic streaming events](#understand-streaming-events) to Ably [messages](https://ably.com/docs/messages.md). You can choose any mapping strategy that suits your application's needs. This guide uses the following pattern as an example: - `start`: Signals the beginning of a response - `token`: Contains the incremental text content for each delta - `stop`: Signals the completion of a response Update your agent file to initialize the Ably client and update the `processEvent()` function to publish events to Ably: #### Agent Javascript ``` // Track state across events let responseId = null; // Process each streaming event and publish to Ably function processEvent(event) { switch (event.type) { case 'message_start': // Capture message ID when response starts responseId = event.message.id; // Publish start event channel.publish({ name: 'start', extras: { headers: { responseId } } }); break; case 'content_block_delta': // Publish tokens from text deltas only if (event.delta.type === 'text_delta') { channel.publish({ name: 'token', data: event.delta.text, extras: { headers: { responseId } } }); } break; case 'message_stop': // Publish stop event when response completes channel.publish({ name: 'stop', extras: { headers: { responseId } } }); break; } } ``` #### Agent Python ``` from ably.types.message import Message # Track state across events response_id = None # Process each streaming event and publish to Ably async def process_event(event): global response_id if event.type == 'message_start': # Capture message ID when response starts response_id = event.message.id # Publish start event await channel.publish(Message( name='start', extras={'headers': {'responseId': response_id}} )) elif event.type == 'content_block_delta': # Publish tokens from text deltas only if hasattr(event.delta, 'text'): await channel.publish(Message( name='token', data=event.delta.text, extras={'headers': {'responseId': response_id}} )) elif event.type == 'message_stop': # Publish stop event when response completes await channel.publish(Message( name='stop', extras={'headers': {'responseId': response_id}} )) ``` #### Agent Java ``` import io.ably.lib.types.Message; import io.ably.lib.types.MessageExtras; import com.google.gson.JsonObject; // Track state across events private static String responseId = null; // Process each streaming event and publish to Ably private static void processEvent(RawMessageStreamEvent event) { if (event.isMessageStart()) { // Capture message ID when response starts responseId = event.asMessageStart().message().id(); // Publish start event JsonObject headers = new JsonObject(); headers.addProperty("responseId", responseId); JsonObject extras = new JsonObject(); extras.add("headers", headers); channel.publish(new Message("start", null, new MessageExtras(extras))); } else if (event.isContentBlockDelta()) { // Publish tokens from text deltas only ContentBlockDeltaEvent delta = event.asContentBlockDelta(); if (delta.delta().isText()) { String text = delta.delta().asText().text(); JsonObject headers = new JsonObject(); headers.addProperty("responseId", responseId); JsonObject extras = new JsonObject(); extras.add("headers", headers); channel.publish(new Message("token", text, new MessageExtras(extras))); } } else if (event.isMessageStop()) { // Publish stop event when response completes JsonObject headers = new JsonObject(); headers.addProperty("responseId", responseId); JsonObject extras = new JsonObject(); extras.add("headers", headers); channel.publish(new Message("stop", null, new MessageExtras(extras))); } } ``` This implementation: - Publishes a `start` event when the response begins - Filters for `content_block_delta` events with `text_delta` type and publishes them as `token` events - Publishes a `stop` event when the response completes - All published events include the `responseId` in message [`extras`](https://ably.com/docs/messages.md#properties) to allow the client to correlate events relating to a particular response Run the publisher to see tokens streaming to Ably: #### Shell ``` cd ably-anthropic-agent node agent.mjs ``` #### Shell ``` cd ably-anthropic-agent python agent.py ``` #### Shell ``` mvn compile exec:java -Dexec.mainClass="Agent" ``` ## Step 3: Subscribe to streaming tokens Create a subscriber that receives the streaming events from Ably and reconstructs the response. In your `ably-anthropic-client`client project directory, create a new file called `client.mjs``Client.java` with the following contents: Add the following code to your iOS or macOS app: ### Client Javascript ``` import Ably from 'ably'; // Initialize Ably Realtime client const realtime = new Ably.Realtime({ key: 'your-api-key' }); // Get the same channel used by the publisher const channel = realtime.channels.get('your-channel-name'); // Track responses by ID const responses = new Map(); // Handle response start await channel.subscribe('start', (message) => { const responseId = message.extras?.headers?.responseId; console.log('\n[Response started]', responseId); responses.set(responseId, ''); }); // Handle tokens await channel.subscribe('token', (message) => { const responseId = message.extras?.headers?.responseId; const token = message.data; // Append token to response const currentText = responses.get(responseId) || ''; responses.set(responseId, currentText + token); // Display token as it arrives process.stdout.write(token); }); // Handle response stop await channel.subscribe('stop', (message) => { const responseId = message.extras?.headers?.responseId; const finalText = responses.get(responseId); console.log('\n[Response completed]', responseId); }); console.log('Subscriber ready, waiting for tokens...'); ``` ### Client Swift ``` import Ably // Initialize Ably Realtime client let realtime = ARTRealtime(key: "your-api-key") // Get the same channel used by the publisher let channel = realtime.channels.get("your-channel-name") // Track responses by ID var responses: [String: String] = [:] // Subscribe to all events and handle by message name try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in channel.subscribe(attachCallback: { error in if let error { continuation.resume(throwing: error) } else { continuation.resume() } }) { message in MainActor.assumeIsolated { guard let extras = (try? message.extras?.toJSON()) as? [String: Any], let headers = extras["headers"] as? [String: Any], let responseID = headers["responseId"] as? String else { return } switch message.name { case "start": // Handle response start print("\n[Response started] \(responseID)") responses[responseID] = "" case "token": // Handle tokens guard let token = message.data as? String else { return } // Append token to response let currentText = responses[responseID] ?? "" responses[responseID] = currentText + token // Display token as it arrives print(token, terminator: "") case "stop": // Handle response stop let finalText = responses[responseID] print("\n[Response completed] \(responseID)") default: break } } } } print("Subscriber ready, waiting for tokens...") ``` ### Client Java ``` import io.ably.lib.realtime.AblyRealtime; import io.ably.lib.realtime.Channel; import io.ably.lib.types.ClientOptions; import com.google.gson.JsonObject; import java.util.HashMap; import java.util.Map; public class Client { // Track responses by ID private static final Map responses = new HashMap<>(); public static void main(String[] args) throws Exception { // Initialize Ably Realtime client ClientOptions options = new ClientOptions("your-api-key"); AblyRealtime realtime = new AblyRealtime(options); // Get the same channel used by the publisher Channel channel = realtime.channels.get("your-channel-name"); // Handle response start channel.subscribe("start", message -> { JsonObject headers = message.extras.asJsonObject().get("headers").getAsJsonObject(); String responseId = headers.get("responseId").getAsString(); System.out.println("\n[Response started] " + responseId); responses.put(responseId, ""); }); // Handle tokens channel.subscribe("token", message -> { JsonObject headers = message.extras.asJsonObject().get("headers").getAsJsonObject(); String responseId = headers.get("responseId").getAsString(); String token = message.data != null ? message.data.toString() : ""; // Append token to response String currentText = responses.getOrDefault(responseId, ""); responses.put(responseId, currentText + token); // Display token as it arrives System.out.print(token); }); // Handle response stop channel.subscribe("stop", message -> { JsonObject headers = message.extras.asJsonObject().get("headers").getAsJsonObject(); String responseId = headers.get("responseId").getAsString(); String finalText = responses.get(responseId); System.out.println("\n[Response completed] " + responseId); }); System.out.println("Subscriber ready, waiting for tokens..."); } } ``` Run the subscriber in a separate terminal: ### Shell ``` cd ably-anthropic-client node client.mjs ``` Build and run your iOS or macOS app in Xcode. ### Shell ``` mvn compile exec:java -Dexec.mainClass="Client" ``` With the subscriber running, run the publisher in another terminal. The tokens stream in realtime as the Anthropic model generates them. With the subscriber running, run the publisher in a terminal. The tokens stream in realtime as the Anthropic model generates them. ## Step 4: Stream with multiple publishers and subscribers Ably's [channel-oriented sessions](https://ably.com/docs/ai-transport/sessions-identity.md#connection-oriented-vs-channel-oriented-sessions) enables multiple AI agents to publish responses and multiple users to receive them on a single channel simultaneously. Ably handles message delivery to all participants, eliminating the need to implement routing logic or manage state synchronization across connections. ### Broadcasting to multiple subscribers Each subscriber receives the complete stream of tokens independently, enabling you to build collaborative experiences or multi-device applications. Run a subscriber in multiple separate terminals: #### Shell ``` # Terminal 1 cd ably-anthropic-client && node client.mjs # Terminal 2 cd ably-anthropic-client && node client.mjs # Terminal 3 cd ably-anthropic-client && node client.mjs ``` #### Shell ``` # Terminal 1 mvn compile exec:java -Dexec.mainClass="Client" # Terminal 2 mvn compile exec:java -Dexec.mainClass="Client" # Terminal 3 mvn compile exec:java -Dexec.mainClass="Client" ``` Run multiple instances of your iOS or macOS app, or run on multiple devices/simulators. All subscribers receive the same stream of tokens in realtime. ### Publishing concurrent responses The implementation uses `responseId` in message [`extras`](https://ably.com/docs/messages.md#properties) to correlate tokens with their originating response. This enables multiple publishers to stream different responses concurrently on the same [channel](https://ably.com/docs/channels.md), with each subscriber correctly tracking all responses independently. To demonstrate this, run a publisher in multiple separate terminals: #### Shell ``` # Terminal 1 cd ably-anthropic-agent && node agent.mjs # Terminal 2 cd ably-anthropic-agent && node agent.mjs # Terminal 3 cd ably-anthropic-agent && node agent.mjs ``` #### Shell ``` # Terminal 1 cd ably-anthropic-agent && python agent.py # Terminal 2 cd ably-anthropic-agent && python agent.py # Terminal 3 cd ably-anthropic-agent && python agent.py ``` #### Shell ``` # Terminal 1 mvn compile exec:java -Dexec.mainClass="Agent" # Terminal 2 mvn compile exec:java -Dexec.mainClass="Agent" # Terminal 3 mvn compile exec:java -Dexec.mainClass="Agent" ``` All running subscribers receive tokens from all responses concurrently. Each subscriber correctly reconstructs each response separately using the `responseId` to correlate tokens. ## Next steps - Learn more about the [message-per-token pattern](https://ably.com/docs/ai-transport/token-streaming/message-per-token.md) used in this guide - Learn about [client hydration strategies](https://ably.com/docs/ai-transport/token-streaming/message-per-token.md#hydration) for handling late joiners and reconnections - Understand [sessions and identity](https://ably.com/docs/ai-transport/sessions-identity.md) in AI enabled applications - Explore the [message-per-response pattern](https://ably.com/docs/ai-transport/token-streaming/message-per-response.md) for storing complete AI responses as single messages in history ## Related Topics - [Message per response](https://ably.com/docs/ai-transport/guides/anthropic/anthropic-message-per-response.md): Stream tokens from the Anthropic Messages API over Ably in realtime using message appends. - [Human-in-the-loop](https://ably.com/docs/ai-transport/guides/anthropic/anthropic-human-in-the-loop.md): Implement human approval workflows for AI agent tool calls using Anthropic and Ably with role-based access control. - [Citations](https://ably.com/docs/ai-transport/guides/anthropic/anthropic-citations.md): Attach source citations to AI responses from the Anthropic Messages API using Ably message annotations. ## Documentation Index To discover additional Ably documentation: 1. Fetch [llms.txt](https://ably.com/llms.txt) for the canonical list of available pages. 2. Identify relevant URLs from that index. 3. Fetch target pages as needed. Avoid using assumed or outdated documentation paths.