# Message per response
Token streaming with message-per-response is a pattern where every token generated by your model for a given response is appended to a single Ably message. Each complete AI response then appears as one message in the channel history while delivering live tokens in realtime. This uses [Ably Pub/Sub](https://ably.com/docs/basics.md) for realtime communication between agents and clients.
This pattern is useful for chat-style applications where you want each complete AI response stored as a single message in history, making it easy to retrieve and display multi-response conversation history. Each agent response becomes a single message that grows as tokens are appended, allowing clients joining mid-stream to catch up efficiently without processing thousands of individual tokens.
The message-per-response pattern includes [automatic rate limit protection](https://ably.com/docs/ai-transport/token-rate-limits.md#per-response) through rollups, making it the recommended approach for most token streaming use cases.
## How it works
1. **Initial message**: When an agent response begins, publish an initial message with `message.create` action to the Ably channel that is either empty or contains the first token as content.
2. **Token streaming**: Append subsequent tokens to the original message by publishing those tokens with the `message.append` action.
3. **Live delivery**: Clients subscribed to the channel receive each appended token in realtime, allowing them to progressively render the response.
4. **Compacted history**: The channel history contains only one message per agent response, which includes all appended tokens concatenated as contiguous text.
You do not need to mark the message or token stream as completed; the final message content automatically includes the full response constructed from all appended tokens.
## Enable appends
Message append functionality requires "Message annotations, updates, deletes and appends" to be enabled in a [channel rule](https://ably.com/docs/channels.md#rules) associated with the channel.
To enable the channel rule:
1. Go to the [Ably dashboard](https://www.ably.com/dashboard) and select your app.
2. Navigate to the "Configuration" > "Rules" section from the left-hand navigation bar.
3. Choose "Add new rule".
4. Enter a channel name or namespace pattern (e.g. `ai` for all channels starting with `ai:`).
5. Select the "Message annotations, updates, deletes and appends" option from the list.
6. Click "Create channel rule".
The examples on this page use the `ai:` namespace prefix, which assumes you have configured the rule for `ai`.
## Publishing tokens
Publish tokens from a [Realtime](https://ably.com/docs/api/realtime-sdk.md) client, which maintains a persistent connection to the Ably service. This allows you to publish at very high message rates with the lowest possible latencies, while preserving guarantees around message delivery order. For more information, see [Realtime and REST](https://ably.com/docs/basics.md#realtime-and-rest).
[Channels](https://ably.com/docs/channels.md) separate message traffic into different topics. For token streaming, each conversation or session typically has its own channel.
Use the [`get()`](https://ably.com/docs/api/realtime-sdk/channels.md#get) method to create or retrieve a channel instance:
```javascript
const channel = realtime.channels.get('ai:your-channel-name');
```
```python
channel = realtime.channels.get('ai:your-channel-name')
```
```java
Channel channel = realtime.channels.get("ai:your-channel-name");
```
To start streaming an AI response, publish the initial message. The message is identified by a server-assigned identifier called a [`serial`](https://ably.com/docs/messages.md#properties). Use the `serial` to append each subsequent token to the message as it arrives from the AI model:
```javascript
// Publish initial message and capture the serial for appending tokens
const { serials: [msgSerial] } = await channel.publish({ name: 'response', data: '' });
// Example: stream returns events like { type: 'token', text: 'Hello' }
for await (const event of stream) {
// Append each token as it arrives
if (event.type === 'token') {
channel.appendMessage({ serial: msgSerial, data: event.text });
}
}
```
```python
# Publish initial message and capture the serial for appending tokens
message = Message(name='response', data='')
result = await channel.publish(message)
msg_serial = result.serials[0]
# Example: stream returns events like { 'type': 'token', 'text': 'Hello' }
async for event in stream:
# Append each token as it arrives
if event['type'] == 'token':
channel.append_message(serial=msg_serial, data=event['text'])
```
```java
// Publish initial message and capture the serial for appending tokens
CompletableFuture publishFuture = new CompletableFuture<>();
channel.publish("response", "", new Callback() {
@Override
public void onSuccess(PublishResult result) {
publishFuture.complete(result);
}
@Override
public void onError(ErrorInfo reason) {
publishFuture.completeExceptionally(AblyException.fromErrorInfo(reason));
}
});
String msgSerial = publishFuture.get().serials[0];
// Example: stream returns events like { type: 'token', text: 'Hello' }
for (Event event : stream) {
// Append each token as it arrives
if (event.getType().equals("token")) {
channel.appendMessage(msgSerial, event.getText());
}
}
```
When publishing tokens, don't await the `channel.appendMessage()` call. Ably rolls up acknowledgments and debounces them for efficiency, which means awaiting each append would unnecessarily slow down your token stream. Messages are still published in the order that `appendMessage()` is called, so delivery order is not affected.
```javascript
// ✅ Do this - append without await for maximum throughput
for await (const event of stream) {
if (event.type === 'token') {
channel.appendMessage({ serial: msgSerial, data: event.text });
}
}
// ❌ Don't do this - awaiting each append reduces throughput
for await (const event of stream) {
if (event.type === 'token') {
await channel.appendMessage({ serial: msgSerial, data: event.text });
}
}
```
```python
# ✅ Do this - append without await for maximum throughput
async for event in stream:
if event['type'] == 'token':
channel.append_message(serial=msg_serial, data=event['text'])
# ❌ Don't do this - awaiting each append reduces throughput
async for event in stream:
if event['type'] == 'token':
await channel.append_message(serial=msg_serial, data=event['text'])
```
```java
// ✅ Do this - append without blocking for maximum throughput
for (Event event : stream) {
if (event.getType().equals("token")) {
channel.appendMessage(msgSerial, event.getText());
}
}
// ❌ Don't do this - blocking on each append reduces throughput
for (Event event : stream) {
if (event.getType().equals("token")) {
CompletableFuture appendFuture = new CompletableFuture<>();
channel.appendMessage(msgSerial, event.getText(), new Callback() {
@Override
public void onSuccess(UpdateDeleteResult result) {
appendFuture.complete(result);
}
@Override
public void onError(ErrorInfo reason) {
appendFuture.completeExceptionally(AblyException.fromErrorInfo(reason));
}
});
appendFuture.get(); // blocking call
}
}
```
This pattern allows publishing append operations for multiple concurrent model responses on the same channel. As long as you append to the correct message serial, tokens from different responses will not interfere with each other, and the final concatenated message for each response will contain only the tokens from that response.
### Configuring rollup behaviour
By default, AI Transport automatically rolls up tokens into messages at a rate of 25 messages per second (using a 40ms rollup window). This protects you from hitting connection rate limits while maintaining a smooth user experience. You can tune this behaviour when establishing your connection to balance between message costs and delivery speed:
```javascript
const realtime = new Ably.Realtime({
key: 'your-api-key',
transportParams: { appendRollupWindow: 100 } // 10 messages/s
});
```
```python
realtime = AblyRealtime(
key='your-api-key',
transport_params={'appendRollupWindow': 100} # 10 messages/s
)
```
```java
ClientOptions options = new ClientOptions();
options.key = "your-api-key";
options.transportParams = Map.of("appendRollupWindow", "100"); // 10 messages/s
AblyRealtime realtime = new AblyRealtime(options);
```
The `appendRollupWindow` parameter controls how many tokens are combined into each published message for a given model output rate. This creates a trade-off between delivery smoothness and the number of concurrent model responses you can stream on a single connection:
- With a shorter rollup window, tokens are published more frequently, creating a smoother, more fluid experience for users as they see the response appear in more fine-grained chunks. However, this consumes more of your [connection's message rate capacity](https://ably.com/docs/platform/pricing/limits.md#connection), limiting how many simultaneous model responses you can stream.
- With a longer rollup window, multiple tokens are batched together into fewer messages, allowing you to run more concurrent response streams on the same connection, but users will notice tokens arriving in larger chunks.
The default 40ms window strikes a balance, delivering tokens at 25 messages per second - smooth enough for a great user experience while allowing you to run two simultaneous response streams on a single connection. If you need to support more concurrent streams, increase the rollup window (up to 500ms), accepting that tokens will arrive in more noticeable batches. Alternatively, instantiate a separate Ably client which uses its own connection, giving you access to additional message rate capacity.
## Subscribing to token streams
Subscribers receive different message actions depending on when they join and how they're retrieving messages. Each message has an `action` field that indicates how to process it, and a `serial` field that identifies which message the action relates to:
- `message.create`: Indicates a new response has started (i.e. a new message was created). The message `data` contains the initial content (often empty or the first token). Store this as the beginning of a new response using `serial` as the identifier.
- `message.append`: Contains a single token fragment to append. The message `data` contains only the new token, not the full concatenated response. Append this token to the existing response identified by `serial`.
- `message.update`: Contains the whole response up to that point. The message `data` contains the full concatenated text so far. Replace the entire response content with this data for the message identified by `serial`. This action occurs when the channel needs to resynchronize the full message state, such as after a client [resumes](https://ably.com/docs/connect/states.md#resume) from a transient disconnection.
```javascript
const channel = realtime.channels.get('ai:your-channel-name');
// Track responses by message serial
const responses = new Map();
// Subscribe to live messages (implicitly attaches the channel)
await channel.subscribe((message) => {
switch (message.action) {
case 'message.create':
// New response started
responses.set(message.serial, message.data);
break;
case 'message.append':
// Append token to existing response
const current = responses.get(message.serial) || '';
responses.set(message.serial, current + message.data);
break;
case 'message.update':
// Replace entire response content
responses.set(message.serial, message.data);
break;
}
});
```
```python
channel = realtime.channels.get('ai:your-channel-name')
# Track responses by message serial
responses = {}
# Subscribe to live messages (implicitly attaches the channel)
def on_message(message):
action = message.action
if action == MessageAction.MESSAGE_CREATE:
# New response started
responses[message.serial] = message.data
elif action == MessageAction.MESSAGE_APPEND:
# Append token to existing response
current = responses.get(message.serial, '')
responses[message.serial] = current + message.data
elif action == MessageAction.MESSAGE_UPDATE:
# Replace entire response content
responses[message.serial] = message.data
await channel.subscribe(on_message)
```
```java
Channel channel = realtime.channels.get("ai:your-channel-name");
// Track responses by message serial
Map responses = new ConcurrentHashMap<>();
// Subscribe to live messages (implicitly attaches the channel)
channel.subscribe(message -> {
switch (message.action) {
case MessageAction.MESSAGE_CREATE:
// New response started
responses.put(message.serial, (String) message.data);
break;
case MessageAction.MESSAGE_APPEND:
// Append token to existing response
String current = responses.getOrDefault(message.serial, "");
responses.put(message.serial, current + (String) message.data);
break;
case MessageAction.MESSAGE_UPDATE:
// Replace entire response content
responses.put(message.serial, (String) message.data);
break;
}
});
```
## Client hydration
When clients connect or reconnect, such as after a page refresh, they often need to catch up on complete responses and individual tokens that were published while they were offline or before they joined.
The message per response pattern enables efficient client state hydration without needing to process every individual token and supports seamlessly transitioning from historical responses to live tokens.
### Using rewind for recent history
The simplest approach is to use Ably's [rewind](https://ably.com/docs/channels/options/rewind.md) channel option to attach to the channel at some point in the recent past, and automatically receive all messages since that point. Historical messages are delivered as `message.update` events containing the complete concatenated response, which then seamlessly transition to live `message.append` events for any ongoing responses:
```javascript
// Use rewind to receive recent historical messages
const channel = realtime.channels.get('ai:your-channel-name', {
params: { rewind: '2m' } // or rewind: '10' for message count
});
// Track responses by message serial
const responses = new Map();
// Subscribe to receive both recent historical and live messages,
// which are delivered in order to the subscription
await channel.subscribe((message) => {
switch (message.action) {
case 'message.create':
// New response started
responses.set(message.serial, message.data);
break;
case 'message.append':
// Append token to existing response
const current = responses.get(message.serial) || '';
responses.set(message.serial, current + message.data);
break;
case 'message.update':
// Replace entire response content
responses.set(message.serial, message.data);
break;
}
});
```
```python
# Use rewind to receive recent historical messages
channel = realtime.channels.get('ai:your-channel-name', params={'rewind': '2m'}) # or rewind: '10' for message count
# Track responses by message serial
responses = {}
# Subscribe to receive both recent historical and live messages,
# which are delivered in order to the subscription
def on_message(message):
action = message.action
if action == MessageAction.MESSAGE_CREATE:
# New response started
responses[message.serial] = message.data
elif action == MessageAction.MESSAGE_APPEND:
# Append token to existing response
current = responses.get(message.serial, '')
responses[message.serial] = current + message.data
elif action == MessageAction.MESSAGE_UPDATE:
# Replace entire response content
responses[message.serial] = message.data
await channel.subscribe(on_message)
```
```java
// Use rewind to receive recent historical messages
ChannelOptions options = new ChannelOptions();
options.params = Map.of("rewind", "2m"); // or rewind: '10' for message count
Channel channel = realtime.channels.get("ai:your-channel-name", options);
// Track responses by message serial
Map responses = new ConcurrentHashMap<>();
// Subscribe to receive both recent historical and live messages,
// which are delivered in order to the subscription
channel.subscribe(message -> {
switch (message.action) {
case MessageAction.MESSAGE_CREATE:
// New response started
responses.put(message.serial, (String) message.data);
break;
case MessageAction.MESSAGE_APPEND:
// Append token to existing response
String current = responses.getOrDefault(message.serial, "");
responses.put(message.serial, current + (String) message.data);
break;
case MessageAction.MESSAGE_UPDATE:
// Replace entire response content
responses.put(message.serial, (String) message.data);
break;
}
});
```
Rewind supports two formats:
- **Time-based**: Use a time interval like `'30s'` or `'2m'` to retrieve messages from that time period
- **Count-based**: Use a number like `10` or `50` to retrieve the most recent N messages (maximum 100)
### Using history for older messages
Use [channel history](https://ably.com/docs/storage-history/history.md) with the [`untilAttach` option](https://ably.com/docs/storage-history/history.md#continuous-history) to paginate back through history to obtain historical responses, while preserving continuity with the delivery of live tokens:
```javascript
const channel = realtime.channels.get('ai:your-channel-name');
// Track responses by message serial
const responses = new Map();
// Subscribe to live messages (implicitly attaches the channel)
await channel.subscribe((message) => {
switch (message.action) {
case 'message.create':
// New response started
responses.set(message.serial, message.data);
break;
case 'message.append':
// Append token to existing response
const current = responses.get(message.serial) || '';
responses.set(message.serial, current + message.data);
break;
case 'message.update':
// Replace entire response content
responses.set(message.serial, message.data);
break;
}
});
// Fetch history up until the point of attachment
let page = await channel.history({ untilAttach: true });
// Paginate backwards through history
while (page) {
// Messages are newest-first
for (const message of page.items) {
// message.data contains the full concatenated text
responses.set(message.serial, message.data);
}
// Move to next page if available
page = page.hasNext() ? await page.next() : null;
}
```
```python
channel = realtime.channels.get('ai:your-channel-name')
# Track responses by message serial
responses = {}
# Subscribe to live messages (implicitly attaches the channel)
def on_message(message):
action = message.action
if action == MessageAction.MESSAGE_CREATE:
# New response started
responses[message.serial] = message.data
elif action == MessageAction.MESSAGE_APPEND:
# Append token to existing response
current = responses.get(message.serial, '')
responses[message.serial] = current + message.data
elif action == MessageAction.MESSAGE_UPDATE:
# Replace entire response content
responses[message.serial] = message.data
await channel.subscribe(on_message)
# Fetch history up until the point of attachment
page = await channel.history(until_attach=True)
# Paginate backwards through history
while page:
# Messages are newest-first
for message in page.items:
# message.data contains the full concatenated text
responses[message.serial] = message.data
# Move to next page if available
page = await page.next() if page.has_next() else None
```
```java
Channel channel = realtime.channels.get("ai:your-channel-name");
// Track responses by message serial
Map responses = new ConcurrentHashMap<>();
// Subscribe to live messages (implicitly attaches the channel)
channel.subscribe(message -> {
switch (message.action) {
case MessageAction.MESSAGE_CREATE:
// New response started
responses.put(message.serial, (String) message.data);
break;
case MessageAction.MESSAGE_APPEND:
// Append token to existing response
String current = responses.getOrDefault(message.serial, "");
responses.put(message.serial, current + (String) message.data);
break;
case MessageAction.MESSAGE_UPDATE:
// Replace entire response content
responses.put(message.serial, (String) message.data);
break;
}
});
// Fetch history up until the point of attachment
PaginatedResult page = channel.history(new Param("untilAttach", "true"));
// Paginate backwards through history
while (page != null) {
// Messages are newest-first
for (Message message : page.items()) {
// message.data contains the full concatenated text
responses.put(message.serial, (String) message.data);
}
// Move to next page if available
page = page.hasNext() ? page.next() : null;
}
```
### Hydrating an in-progress response
A common pattern is to persist complete model responses in your database while using Ably for streaming in-progress responses.
The client loads completed responses from your database, then uses Ably to catch up on any response that was still in progress.
You can hydrate in-progress responses using either the [rewind](#rewind) or [history](#history) pattern.
#### Publishing with correlation metadata
To correlate Ably messages with your database records, include the `responseId` in the message [`extras`](https://ably.com/docs/messages.md#properties) when publishing:
```javascript
// Publish initial message with responseId in extras
const { serials: [msgSerial] } = await channel.publish({
name: 'response',
data: '',
extras: {
headers: {
responseId: 'resp_abc123' // Your database response ID
}
}
});
// Append tokens, including extras to preserve headers
for await (const event of stream) {
if (event.type === 'token') {
channel.appendMessage({ serial: msgSerial, data: event.text, extras: {
headers: {
responseId: 'resp_abc123'
}
}});
}
}
```
```python
# Publish initial message with responseId in extras
message = Message(
name='response',
data='',
extras={
'headers': {
'responseId': 'resp_abc123' # Your database response ID
}
}
)
result = await channel.publish(message)
msg_serial = result.serials[0]
# Append tokens, including extras to preserve headers
async for event in stream:
if event['type'] == 'token':
channel.append_message(
serial=msg_serial,
data=event['text'],
extras={
'headers': {
'responseId': 'resp_abc123'
}
}
)
```
```java
// Publish initial message with responseId in extras
JsonObject extras = new JsonObject();
JsonObject headers = new JsonObject();
headers.addProperty("responseId", "resp_abc123"); // Your database response ID
extras.add("headers", headers);
CompletableFuture publishFuture = new CompletableFuture<>();
channel.publish("response", "", extras, new Callback() {
@Override
public void onSuccess(PublishResult result) {
publishFuture.complete(result);
}
@Override
public void onError(ErrorInfo reason) {
publishFuture.completeExceptionally(AblyException.fromErrorInfo(reason));
}
});
String msgSerial = publishFuture.get().serials[0];
// Append tokens, including extras to preserve headers
for (Event event : stream) {
if (event.getType().equals("token")) {
JsonObject appendExtras = new JsonObject();
JsonObject appendHeaders = new JsonObject();
appendHeaders.addProperty("responseId", "resp_abc123");
appendExtras.add("headers", appendHeaders);
channel.appendMessage(msgSerial, event.getText(), appendExtras);
}
}
```
#### Hydrate using rewind
When hydrating, load completed responses from your database, then use [rewind](https://ably.com/docs/channels/options/rewind.md) to catch up on any in-progress response. Check the `responseId` from message extras to skip responses already loaded from your database:
```javascript
// Load completed responses from your database
// completedResponses is a Set of responseIds
const completedResponses = await loadResponsesFromDatabase();
// Use rewind to receive recent historical messages
const channel = realtime.channels.get('ai:responses', {
params: { rewind: '2m' }
});
// Track in-progress responses by responseId
const inProgressResponses = new Map();
await channel.subscribe((message) => {
const responseId = message.extras?.headers?.responseId;
if (!responseId) {
console.warn('Message missing responseId');
return;
}
// Skip messages for responses already loaded from database
if (completedResponses.has(responseId)) {
return;
}
switch (message.action) {
case 'message.create':
// New response started
inProgressResponses.set(responseId, message.data);
break;
case 'message.append':
// Append token to existing response
const current = inProgressResponses.get(responseId) || '';
inProgressResponses.set(responseId, current + message.data);
break;
case 'message.update':
// Replace entire response content
inProgressResponses.set(responseId, message.data);
break;
}
});
```
```python
# Load completed responses from your database
# completed_responses is a set of responseIds
completed_responses = await load_responses_from_database()
# Use rewind to receive recent historical messages
channel = realtime.channels.get('ai:responses', params={'rewind': '2m'})
# Track in-progress responses by responseId
in_progress_responses = {}
def on_message(message):
response_id = message.extras.get('headers', {}).get('responseId')
if not response_id:
print('Message missing responseId')
return
# Skip messages for responses already loaded from database
if response_id in completed_responses:
return
action = message.action
if action == MessageAction.MESSAGE_CREATE:
# New response started
in_progress_responses[response_id] = message.data
elif action == MessageAction.MESSAGE_APPEND:
# Append token to existing response
current = in_progress_responses.get(response_id, '')
in_progress_responses[response_id] = current + message.data
elif action == MessageAction.MESSAGE_UPDATE:
# Replace entire response content
in_progress_responses[response_id] = message.data
await channel.subscribe(on_message)
```
```java
// Load completed responses from your database
// completedResponses is a Set of responseIds
Set completedResponses = loadResponsesFromDatabase();
// Use rewind to receive recent historical messages
ChannelOptions options = new ChannelOptions();
options.params = Map.of("rewind", "2m");
Channel channel = realtime.channels.get("ai:responses", options);
// Track in-progress responses by responseId
Map inProgressResponses = new ConcurrentHashMap<>();
channel.subscribe(message -> {
JsonObject headers = message.extras.asJsonObject().get("headers").getAsJsonObject();
String responseId = headers != null ? headers.get("responseId").getAsString() : null;
if (responseId == null) {
System.err.println("Message missing responseId");
return;
}
// Skip messages for responses already loaded from database
if (completedResponses.contains(responseId)) {
return;
}
switch (message.action) {
case MessageAction.MESSAGE_CREATE:
// New response started
inProgressResponses.put(responseId, (String) message.data);
break;
case MessageAction.MESSAGE_APPEND:
// Append token to existing response
String current = inProgressResponses.getOrDefault(responseId, "");
inProgressResponses.put(responseId, current + (String) message.data);
break;
case MessageAction.MESSAGE_UPDATE:
// Replace entire response content
inProgressResponses.put(responseId, (String) message.data);
break;
}
});
```
#### Hydrate using history
Load completed responses from your database, then use [channel history](https://ably.com/docs/storage-history/history.md) with the [`untilAttach` option](https://ably.com/docs/storage-history/history.md#continuous-history) to catch up on any in-progress responses. Use the timestamp of the last completed response to start pagination from that point forward, ensuring continuity with live message delivery.
```javascript
// Load completed responses from database (sorted by timestamp, oldest first)
const completedResponses = await loadResponsesFromDatabase();
// Get the timestamp of the latest completed response
const latestTimestamp = completedResponses.latest().timestamp;
const channel = realtime.channels.get('ai:your-channel-name');
// Track in progress responses by ID
const inProgressResponses = new Map();
// Subscribe to live messages (implicitly attaches)
await channel.subscribe((message) => {
const responseId = message.extras?.headers?.responseId;
if (!responseId) {
console.warn('Message missing responseId');
return;
}
// Skip messages for responses already loaded from database
if (completedResponses.has(responseId)) {
return;
}
switch (message.action) {
case 'message.create':
// New response started
inProgressResponses.set(responseId, message.data);
break;
case 'message.append':
// Append token to existing response
const current = inProgressResponses.get(responseId) || '';
inProgressResponses.set(responseId, current + message.data);
break;
case 'message.update':
// Replace entire response content
inProgressResponses.set(responseId, message.data);
break;
}
});
// Fetch history from the last completed response until attachment
let page = await channel.history({
untilAttach: true,
start: latestTimestamp,
direction: 'forwards'
});
// Paginate through all missed messages
while (page) {
for (const message of page.items) {
const responseId = message.extras?.headers?.responseId;
if (!responseId) {
console.warn('Message missing responseId');
continue;
}
// message.data contains the full concatenated text so far
inProgressResponses.set(responseId, message.data);
}
// Move to next page if available
page = page.hasNext() ? await page.next() : null;
}
```
```python
# Load completed responses from database (sorted by timestamp, oldest first)
completed_responses = await load_responses_from_database()
# Get the timestamp of the latest completed response
latest_timestamp = completed_responses.latest().timestamp
channel = realtime.channels.get('ai:your-channel-name')
# Track in progress responses by ID
in_progress_responses = {}
# Subscribe to live messages (implicitly attaches)
def on_message(message):
response_id = message.extras.get('headers', {}).get('responseId')
if not response_id:
print('Message missing responseId')
return
# Skip messages for responses already loaded from database
if response_id in completed_responses:
return
action = message.action
if action == MessageAction.MESSAGE_CREATE:
# New response started
in_progress_responses[response_id] = message.data
elif action == MessageAction.MESSAGE_APPEND:
# Append token to existing response
current = in_progress_responses.get(response_id, '')
in_progress_responses[response_id] = current + message.data
elif action == MessageAction.MESSAGE_UPDATE:
# Replace entire response content
in_progress_responses[response_id] = message.data
await channel.subscribe(on_message)
# Fetch history from the last completed response until attachment
page = await channel.history(
until_attach=True,
start=latest_timestamp,
direction='forwards'
)
# Paginate through all missed messages
while page:
for message in page.items:
response_id = message.extras.get('headers', {}).get('responseId')
if not response_id:
print('Message missing responseId')
continue
# message.data contains the full concatenated text so far
in_progress_responses[response_id] = message.data
# Move to next page if available
page = await page.next() if page.has_next() else None
```
```java
// Load completed responses from database (sorted by timestamp, oldest first)
List completedResponses = loadResponsesFromDatabase();
// Get the timestamp of the latest completed response
long latestTimestamp = completedResponses.get(completedResponses.size() - 1).getTimestamp();
Channel channel = realtime.channels.get("ai:your-channel-name");
// Track in progress responses by ID
Map inProgressResponses = new ConcurrentHashMap<>();
// Subscribe to live messages (implicitly attaches)
channel.subscribe(message -> {
JsonObject headers = message.extras.asJsonObject().get("headers").getAsJsonObject();
String responseId = headers != null ? headers.get("responseId").getAsString() : null;
if (responseId == null) {
System.err.println("Message missing responseId");
return;
}
// Skip messages for responses already loaded from database
if (completedResponses.stream().anyMatch(r -> r.getId().equals(responseId))) {
return;
}
switch (message.action) {
case MessageAction.MESSAGE_CREATE:
// New response started
inProgressResponses.put(responseId, (String) message.data);
break;
case MessageAction.MESSAGE_APPEND:
// Append token to existing response
String current = inProgressResponses.getOrDefault(responseId, "");
inProgressResponses.put(responseId, current + (String) message.data);
break;
case MessageAction.MESSAGE_UPDATE:
// Replace entire response content
inProgressResponses.put(responseId, (String) message.data);
break;
}
});
// Fetch history from the last completed response until attachment
PaginatedResult page = channel.history(
new Param("untilAttach", "true"),
new Param("start", String.valueOf(latestTimestamp)),
new Param("direction", "forwards")
);
// Paginate through all missed messages
while (page != null) {
for (Message message : page.items()) {
JsonObject headers = message.extras.asJsonObject().get("headers").getAsJsonObject();
String responseId = headers != null ? headers.get("responseId").getAsString() : null;
if (responseId == null) {
System.err.println("Message missing responseId");
continue;
}
// message.data contains the full concatenated text so far
inProgressResponses.put(responseId, (String) message.data);
}
// Move to next page if available
page = page.hasNext() ? page.next() : null;
}
```