This guide shows you how to stream AI responses from LangGraph over Ably using the message-per-token pattern. Specifically, it implements the explicit start/stop events approach, which publishes each response token as an individual message, along with explicit lifecycle events to signal when responses begin and end.
Using Ably to distribute tokens from LangGraph enables you to broadcast AI responses to thousands of concurrent subscribers with reliable message delivery and ordering guarantees, ensuring that each client receives the complete response stream with all tokens delivered in order. This approach decouples your AI inference from client connections, enabling you to scale agents independently and handle reconnections gracefully.
Prerequisites
To follow this guide, you need:
- Node.js 20 or higher
- An Anthropic API key
- An Ably API key
Useful links:
Create a new NPM package, which will contain the publisher and subscriber code:
mkdir ably-langgraph-example && cd ably-langgraph-example
npm init -yInstall the required packages using NPM:
npm install @langchain/langgraph@^0.2 @langchain/anthropic@^0.3 @langchain/core@^0.3 ably@^2Export your Anthropic API key to the environment, which will be used later in the guide by the Anthropic SDK:
export ANTHROPIC_API_KEY="your_api_key_here"Step 1: Get a streamed response from LangGraph
Initialize LangGraph with a simple graph that uses Claude to respond to prompts, and use stream with streamMode: "messages" to stream model tokens.
Create a new file publisher.mjs with the following contents:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import { ChatAnthropic } from "@langchain/anthropic";
import { StateGraph, Annotation, START, END } from "@langchain/langgraph";
// Initialize the model
const model = new ChatAnthropic({ model: "claude-sonnet-4-5" });
// Define state with message history
const StateAnnotation = Annotation.Root({
messages: Annotation({
reducer: (x, y) => x.concat(y),
default: () => [],
}),
});
// Build and compile a simple graph
const graph = new StateGraph(StateAnnotation)
.addNode("agent", async (state) => {
const response = await model.invoke(state.messages);
return { messages: [response] };
})
.addEdge(START, "agent")
.addEdge("agent", END);
const app = graph.compile();
// Stream response tokens
async function streamLangGraphResponse(prompt) {
const stream = await app.stream(
{ messages: [{ role: "user", content: prompt }] },
{ streamMode: "messages" }
);
for await (const [messageChunk, metadata] of stream) {
console.log(messageChunk.content || "(empty)");
}
}
// Usage example
streamLangGraphResponse("Tell me a short joke");Understand LangGraph streaming
LangGraph's stream method with streamMode: "messages" streams LLM tokens from your graph. The stream returns tuples of [messageChunk, metadata] where:
-
messageChunk: Contains the token content in thecontentfield. These represent incremental text chunks as the model generates them. -
metadata: Contains metadata about the stream, including thelanggraph_nodewhere the LLM is invoked and any associated tags.
The following example shows the message chunks received when streaming a response. Each event is a tuple of [messageChunk, metadata]:
1
2
3
4
5
6
7
8
9
10
11
12
13
// 1. Stream initialization (empty content with model metadata)
[{"lc":1,"type":"constructor","id":["langchain_core","messages","AIMessageChunk"],"kwargs":{"content":"","additional_kwargs":{"model":"claude-sonnet-4-5-20250929","id":"msg_01SPbpi5P7CkNqgxPT2Ne9u5","type":"message","role":"assistant"},"tool_call_chunks":[],"usage_metadata":{"input_tokens":12,"output_tokens":1,"total_tokens":13},"id":"msg_01SPbpi5P7CkNqgxPT2Ne9u5"}},{"langgraph_step":1,"langgraph_node":"agent","langgraph_triggers":["branch:to:agent"]}]
// 2. Empty content chunk
[{"lc":1,"type":"constructor","id":["langchain_core","messages","AIMessageChunk"],"kwargs":{"content":"","additional_kwargs":{},"tool_call_chunks":[],"id":"msg_01SPbpi5P7CkNqgxPT2Ne9u5"}},{"langgraph_step":1,"langgraph_node":"agent"}]
// 3. Text tokens stream in
[{"lc":1,"type":"constructor","id":["langchain_core","messages","AIMessageChunk"],"kwargs":{"content":"Why","additional_kwargs":{},"tool_call_chunks":[],"id":"msg_01SPbpi5P7CkNqgxPT2Ne9u5"}},{"langgraph_step":1,"langgraph_node":"agent"}]
[{"lc":1,"type":"constructor","id":["langchain_core","messages","AIMessageChunk"],"kwargs":{"content":" don't scientists trust atoms?\n\nBecause","additional_kwargs":{},"tool_call_chunks":[],"id":"msg_01SPbpi5P7CkNqgxPT2Ne9u5"}},{"langgraph_step":1,"langgraph_node":"agent"}]
[{"lc":1,"type":"constructor","id":["langchain_core","messages","AIMessageChunk"],"kwargs":{"content":" they make up everything!","additional_kwargs":{},"tool_call_chunks":[],"id":"msg_01SPbpi5P7CkNqgxPT2Ne9u5"}},{"langgraph_step":1,"langgraph_node":"agent"}]
// 4. Stream completion (empty content with stop reason and final usage)
[{"lc":1,"type":"constructor","id":["langchain_core","messages","AIMessageChunk"],"kwargs":{"content":"","additional_kwargs":{"stop_reason":"end_turn","stop_sequence":null},"usage_metadata":{"input_tokens":0,"output_tokens":17,"total_tokens":17},"id":"msg_01SPbpi5P7CkNqgxPT2Ne9u5"}},{"langgraph_step":1,"langgraph_node":"agent"}]Step 2: Publish streaming events to Ably
Publish LangGraph streaming events to Ably to reliably and scalably distribute them to subscribers.
This implementation follows the explicit start/stop events pattern, which provides clear response boundaries.
Initialize the Ably client
Add the Ably client initialization to your publisher.mjs file:
1
2
3
4
5
6
7
8
9
10
import Ably from 'ably';
// Initialize Ably Realtime client
const realtime = new Ably.Realtime({
key: 'demokey:*****',
echoMessages: false
});
// Create a channel for publishing streamed AI responses
const channel = realtime.channels.get('map-cod-cog');The Ably Realtime client maintains a persistent connection to the Ably service, which allows you to publish tokens at high message rates with low latency.
Map LangGraph streaming events to Ably messages
Choose how to map LangGraph streaming events to Ably messages. You can choose any mapping strategy that suits your application's needs. This guide uses the following pattern as an example:
start: Signals the beginning of a responsetoken: Contains the incremental text content for each deltastop: Signals the completion of a response
Update your publisher.mjs file to initialize the Ably client and update the streamLangGraphResponse() function to publish streaming tokens to Ably:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// Track response ID across events
let responseId = null;
// Create streaming response from LangGraph
async function streamLangGraphResponse(prompt) {
const input = {
messages: [{ role: "user", content: prompt }],
};
// Stream tokens using messages mode
const stream = await app.stream(input, { streamMode: "messages" });
for await (const [messageChunk, metadata] of stream) {
// Capture response ID from the first message chunk
if (!responseId && messageChunk?.id) {
responseId = messageChunk.id;
// Publish start event with response ID
channel.publish({
name: 'start',
extras: {
headers: { responseId }
}
});
}
// Extract token content
const content = messageChunk?.content;
if (content) {
channel.publish({
name: 'token',
data: content,
extras: {
headers: { responseId }
}
});
}
}
// Publish stop event
channel.publish({
name: 'stop',
extras: {
headers: { responseId }
}
});
}This implementation:
- Captures the
responseIdfrom the first message chunk'sidfield - Publishes a
startevent when the response ID is captured - Streams tokens from the graph using
streamMode: "messages" - Extracts the
contentfrom each message chunk and publishes it as atokenevent - Publishes a
stopevent when streaming completes - All published events include the
responseIdin messageextrasto allow the client to correlate events relating to a particular response
Run the publisher to see tokens streaming to Ably:
node publisher.mjsStep 3: Subscribe to streaming tokens
Create a subscriber that receives the streaming events from Ably and reconstructs the response.
Create a new file subscriber.mjs with the following contents:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import Ably from 'ably';
// Initialize Ably Realtime client
const realtime = new Ably.Realtime({ key: 'demokey:*****' });
// Get the same channel used by the publisher
const channel = realtime.channels.get('map-cod-cog');
// Track responses by ID
const responses = new Map();
// Handle response start
await channel.subscribe('start', (message) => {
const responseId = message.extras?.headers?.responseId;
console.log('\n[Response started]', responseId);
responses.set(responseId, '');
});
// Handle tokens
await channel.subscribe('token', (message) => {
const responseId = message.extras?.headers?.responseId;
const token = message.data;
// Append token to response
const currentText = responses.get(responseId) || '';
responses.set(responseId, currentText + token);
// Display token as it arrives
process.stdout.write(token);
});
// Handle response stop
await channel.subscribe('stop', (message) => {
const responseId = message.extras?.headers?.responseId;
const finalText = responses.get(responseId);
console.log('\n[Response completed]', responseId);
});
console.log('Subscriber ready, waiting for tokens...');Run the subscriber in a separate terminal:
node subscriber.mjsWith the subscriber running, run the publisher in another terminal. The tokens stream in realtime as the AI model generates them.
Step 4: Stream with multiple publishers and subscribers
Ably's channel-oriented sessions enables multiple AI agents to publish responses and multiple users to receive them on a single channel simultaneously. Ably handles message delivery to all participants, eliminating the need to implement routing logic or manage state synchronization across connections.
Broadcasting to multiple subscribers
Each subscriber receives the complete stream of tokens independently, enabling you to build collaborative experiences or multi-device applications.
Run a subscriber in multiple separate terminals:
# Terminal 1
node subscriber.mjs
# Terminal 2
node subscriber.mjs
# Terminal 3
node subscriber.mjsAll subscribers receive the same stream of tokens in realtime.
Publishing concurrent responses
The implementation uses responseId in message extras to correlate tokens with their originating response. This enables multiple publishers to stream different responses concurrently on the same channel, with each subscriber correctly tracking all responses independently.
To demonstrate this, run a publisher in multiple separate terminals:
# Terminal 1
node publisher.mjs
# Terminal 2
node publisher.mjs
# Terminal 3
node publisher.mjsAll running subscribers receive tokens from all responses concurrently. Each subscriber correctly reconstructs each response separately using the responseId to correlate tokens.
Next steps
- Learn more about the message-per-token pattern used in this guide
- Learn about client hydration strategies for handling late joiners and reconnections
- Understand sessions and identity in AI enabled applications
- Explore the message-per-response pattern for storing complete AI responses as single messages in history