This guide shows you how to stream AI responses from LangGraph over Ably using the message-per-response pattern. Specifically, it appends each response token to a single Ably message, creating a complete AI response that grows incrementally while delivering tokens in realtime.
Using Ably to distribute tokens from LangGraph enables you to broadcast AI responses to thousands of concurrent subscribers with reliable message delivery and ordering guarantees. This approach stores each complete response as a single message in channel history, making it easy to retrieve conversation history without processing thousands of individual token messages.
Prerequisites
To follow this guide, you need:
- Node.js 20 or higher
- An Anthropic API key
- An Ably API key
Useful links:
Create a new NPM package, which will contain the publisher and subscriber code:
mkdir ably-langgraph-example-per-response && cd ably-langgraph-example-per-response
npm init -yInstall the required packages using NPM:
npm install @langchain/langgraph@^0.2 @langchain/anthropic@^0.3 @langchain/core@^0.3 ably@^2Export your Anthropic API key to the environment, which will be used later in the guide by the Anthropic SDK:
export ANTHROPIC_API_KEY="your_api_key_here"Step 1: Enable message appends
Message append functionality requires "Message annotations, updates, deletes and appends" to be enabled in a channel rule associated with the channel.
To enable the channel rule:
- Go to the Ably dashboard and select your app.
- Navigate to the "Configuration" > "Rules" section from the left-hand navigation bar.
- Choose "Add new rule".
- Enter a channel name or namespace pattern (e.g.
aifor all channels starting withai:). - Select the "Message annotations, updates, deletes and appends" option from the list.
- Click "Create channel rule".
The examples in this guide use the ai: namespace prefix, which assumes you have configured the rule for ai:*.
Step 2: Get a streamed response from LangGraph
Initialize LangGraph with a simple graph that uses Claude to respond to prompts, and use stream with streamMode: "messages" to stream model tokens.
Create a new file publisher.mjs with the following contents:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import { ChatAnthropic } from "@langchain/anthropic";
import { StateGraph, Annotation, START, END } from "@langchain/langgraph";
// Initialize the model
const model = new ChatAnthropic({ model: "claude-sonnet-4-5" });
// Define state with message history
const StateAnnotation = Annotation.Root({
messages: Annotation({
reducer: (x, y) => x.concat(y),
default: () => [],
}),
});
// Build and compile a simple graph
const graph = new StateGraph(StateAnnotation)
.addNode("agent", async (state) => {
const response = await model.invoke(state.messages);
return { messages: [response] };
})
.addEdge(START, "agent")
.addEdge("agent", END);
const app = graph.compile();
// Stream response tokens
async function streamLangGraphResponse(prompt) {
const stream = await app.stream(
{ messages: [{ role: "user", content: prompt }] },
{ streamMode: "messages" }
);
for await (const [messageChunk, metadata] of stream) {
console.log(messageChunk.content || "(empty)");
}
}
// Usage example
streamLangGraphResponse("Tell me a short joke");Understand LangGraph streaming
LangGraph's stream method with streamMode: "messages" streams LLM tokens from your graph. The stream returns tuples of [messageChunk, metadata] where:
-
messageChunk: Contains the token content in thecontentfield. These represent incremental text chunks as the model generates them. The message chunk also includes anidfield that uniquely identifies the response. -
metadata: Contains metadata about the stream, including thelanggraph_nodewhere the LLM is invoked and any associated tags.
The following example shows the message chunks received when streaming a response. Each event is a tuple of [messageChunk, metadata]:
1
2
3
4
5
6
7
8
9
10
11
12
13
// 1. Stream initialization (empty content with model metadata)
[{"lc":1,"type":"constructor","id":["langchain_core","messages","AIMessageChunk"],"kwargs":{"content":"","additional_kwargs":{"model":"claude-sonnet-4-5-20250929","id":"msg_01SPbpi5P7CkNqgxPT2Ne9u5","type":"message","role":"assistant"},"tool_call_chunks":[],"usage_metadata":{"input_tokens":12,"output_tokens":1,"total_tokens":13},"id":"msg_01SPbpi5P7CkNqgxPT2Ne9u5"}},{"langgraph_step":1,"langgraph_node":"agent","langgraph_triggers":["branch:to:agent"]}]
// 2. Empty content chunk
[{"lc":1,"type":"constructor","id":["langchain_core","messages","AIMessageChunk"],"kwargs":{"content":"","additional_kwargs":{},"tool_call_chunks":[],"id":"msg_01SPbpi5P7CkNqgxPT2Ne9u5"}},{"langgraph_step":1,"langgraph_node":"agent"}]
// 3. Text tokens stream in
[{"lc":1,"type":"constructor","id":["langchain_core","messages","AIMessageChunk"],"kwargs":{"content":"Why","additional_kwargs":{},"tool_call_chunks":[],"id":"msg_01SPbpi5P7CkNqgxPT2Ne9u5"}},{"langgraph_step":1,"langgraph_node":"agent"}]
[{"lc":1,"type":"constructor","id":["langchain_core","messages","AIMessageChunk"],"kwargs":{"content":" don't scientists trust atoms?\n\nBecause","additional_kwargs":{},"tool_call_chunks":[],"id":"msg_01SPbpi5P7CkNqgxPT2Ne9u5"}},{"langgraph_step":1,"langgraph_node":"agent"}]
[{"lc":1,"type":"constructor","id":["langchain_core","messages","AIMessageChunk"],"kwargs":{"content":" they make up everything!","additional_kwargs":{},"tool_call_chunks":[],"id":"msg_01SPbpi5P7CkNqgxPT2Ne9u5"}},{"langgraph_step":1,"langgraph_node":"agent"}]
// 4. Stream completion (empty content with stop reason and final usage)
[{"lc":1,"type":"constructor","id":["langchain_core","messages","AIMessageChunk"],"kwargs":{"content":"","additional_kwargs":{"stop_reason":"end_turn","stop_sequence":null},"usage_metadata":{"input_tokens":0,"output_tokens":17,"total_tokens":17},"id":"msg_01SPbpi5P7CkNqgxPT2Ne9u5"}},{"langgraph_step":1,"langgraph_node":"agent"}]Step 3: Publish streaming tokens to Ably
Publish LangGraph streaming tokens to Ably using message appends to reliably and scalably distribute them to subscribers.
Each AI response is stored as a single Ably message that grows as tokens are appended.
Initialize the Ably client
Add the Ably client initialization to your publisher.mjs file:
1
2
3
4
5
6
7
8
9
10
import Ably from 'ably';
// Initialize Ably Realtime client
const realtime = new Ably.Realtime({
key: 'demokey:*****',
echoMessages: false
});
// Create a channel for publishing streamed AI responses
const channel = realtime.channels.get('ai:map-cod-cog');The Ably Realtime client maintains a persistent connection to the Ably service, which allows you to publish tokens at high message rates with low latency.
Publish initial message and append tokens
When a new response begins, publish an initial message to create it. Ably assigns a serial identifier to the message. Use this serial to append each token to the message as it arrives from the LangGraph stream.
Update your publisher.mjs file to publish the initial message and append tokens:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Track state across chunks
let msgSerial = null;
// Stream response tokens
async function streamLangGraphResponse(prompt) {
const stream = await app.stream(
{ messages: [{ role: "user", content: prompt }] },
{ streamMode: "messages" }
);
for await (const [messageChunk, metadata] of stream) {
const content = messageChunk?.content;
// Publish initial empty message on first chunk
if (!msgSerial && messageChunk?.id) {
const result = await channel.publish({
name: 'response',
data: ''
});
// Capture the message serial for appending tokens
msgSerial = result.serials[0];
}
// Append token content to the message
if (content && msgSerial) {
channel.appendMessage({
serial: msgSerial,
data: content
});
}
}
console.log('Stream completed!');
}This implementation:
- Publishes an initial empty message when the first chunk arrives and captures the
serial - Filters for chunks with non-empty
content - Appends each token to the original message using the captured
serial
Run the publisher to see tokens streaming to Ably:
node publisher.mjsStep 4: Subscribe to streaming tokens
Create a subscriber that receives the streaming tokens from Ably and reconstructs the response in realtime.
Create a new file subscriber.mjs with the following contents:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import Ably from 'ably';
// Initialize Ably Realtime client
const realtime = new Ably.Realtime({ key: 'demokey:*****' });
// Get the same channel used by the publisher
const channel = realtime.channels.get('ai:map-cod-cog');
// Track responses by message serial
const responses = new Map();
// Subscribe to receive messages
await channel.subscribe((message) => {
switch (message.action) {
case 'message.create':
// New response started
console.log('\n[Response started]', message.serial);
responses.set(message.serial, message.data);
break;
case 'message.append':
// Append token to existing response
const current = responses.get(message.serial) || '';
responses.set(message.serial, current + message.data);
// Display token as it arrives
process.stdout.write(message.data);
break;
case 'message.update':
// Replace entire response content
responses.set(message.serial, message.data);
console.log('\n[Response updated with full content]');
break;
}
});
console.log('Subscriber ready, waiting for tokens...');Subscribers receive different message actions depending on when they join and how they're retrieving messages:
-
message.create: Indicates a new response has started (i.e. a new message was created). The messagedatacontains the initial content (often empty or the first token). Store this as the beginning of a new response usingserialas the identifier. -
message.append: Contains a single token fragment to append. The messagedatacontains only the new token, not the full concatenated response. Append this token to the existing response identified byserial. -
message.update: Contains the whole response up to that point. The messagedatacontains the full concatenated text so far. Replace the entire response content with this data for the message identified byserial. This action occurs when the channel needs to resynchronize the full message state, such as after a client resumes from a transient disconnection.
Run the subscriber in a separate terminal:
node subscriber.mjsWith the subscriber running, run the publisher in another terminal. The tokens stream in realtime as the AI model generates them.
Step 5: Stream with multiple publishers and subscribers
Ably's channel-oriented sessions enables multiple AI agents to publish responses and multiple users to receive them on a single channel simultaneously. Ably handles message delivery to all participants, eliminating the need to implement routing logic or manage state synchronization across connections.
Broadcasting to multiple subscribers
Each subscriber receives the complete stream of tokens independently, enabling you to build collaborative experiences or multi-device applications.
Run a subscriber in multiple separate terminals:
# Terminal 1
node subscriber.mjs
# Terminal 2
node subscriber.mjs
# Terminal 3
node subscriber.mjsAll subscribers receive the same stream of tokens in realtime.
Publishing concurrent responses
Multiple publishers can stream different responses concurrently on the same channel. Each response is stored as a separate message with its own serial, allowing subscribers to track all responses independently.
To demonstrate this, run a publisher in multiple separate terminals:
# Terminal 1
node publisher.mjs
# Terminal 2
node publisher.mjs
# Terminal 3
node publisher.mjsAll running subscribers receive tokens from all responses concurrently. Each subscriber correctly reconstructs each response separately using the serial to correlate tokens.
Step 6: Retrieve complete responses from history
One key advantage of the message-per-response pattern is that each complete AI response is stored as a single message in channel history. This makes it efficient to retrieve conversation history without processing thousands of individual token messages.
Use Ably's rewind channel option to attach to the channel at some point in the recent past and automatically receive complete responses from history. Historical messages are delivered as message.update events containing the complete concatenated response, which then seamlessly transition to live message.append events for any ongoing responses:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Use rewind to receive recent historical messages
const channel = realtime.channels.get('ai:map-cod-cog', {
params: { rewind: '2m' } // Retrieve messages from the last 2 minutes
});
const responses = new Map();
await channel.subscribe((message) => {
switch (message.action) {
case 'message.create':
responses.set(message.serial, message.data);
break;
case 'message.append':
const current = responses.get(message.serial) || '';
responses.set(message.serial, current + message.data);
process.stdout.write(message.data);
break;
case 'message.update':
// Historical messages contain full concatenated response
responses.set(message.serial, message.data);
console.log('\n[Historical response]:', message.data);
break;
}
});Next steps
- Learn more about the message-per-response pattern used in this guide
- Learn about client hydration strategies for handling late joiners and reconnections
- Understand sessions and identity in AI enabled applications
- Explore the message-per-token pattern for streaming individual tokens as separate messages