# Message per token
Token streaming with message-per-token is a pattern where every token generated by your model is published as an independent Ably message. Each token then appears as one message in the channel history. This uses [Ably Pub/Sub](https://ably.com/docs/basics.md) for realtime communication between agents and clients.
This pattern is useful when clients only care about the most recent part of a response and you are happy to treat the channel history as a short sliding window rather than a full conversation log. For example:
- Backend-stored responses: The backend writes complete responses to a database and clients load those full responses from there, while Ably is used only to deliver live tokens for the current in-progress response.
- Live transcription, captioning, or translation: A viewer who joins a live stream only needs sufficient tokens for the current "frame" of subtitles, not the entire transcript so far.
- Code assistance in an editor: Streamed tokens become part of the file on disk as they are accepted, so past tokens do not need to be replayed from Ably.
- Autocomplete: A fresh response is streamed for each change a user makes to a document, with only the latest suggestion being relevant.
## Publishing tokens
Publish tokens from a [Realtime](https://ably.com/docs/api/realtime-sdk.md) client, which maintains a persistent connection to the Ably service. This allows you to publish at very high message rates with the lowest possible latencies, while preserving guarantees around message delivery order. For more information, see [Realtime and REST](https://ably.com/docs/basics.md#realtime-and-rest).
[Channels](https://ably.com/docs/channels.md) separate message traffic into different topics. For token streaming, each conversation or session typically has its own channel.
Use the [`get()`](https://ably.com/docs/api/realtime-sdk/channels.md#get) method to create or retrieve a channel instance:
```javascript
const channel = realtime.channels.get('your-channel-name');
```
```python
channel = realtime.channels.get('your-channel-name')
```
```java
Channel channel = realtime.channels.get("your-channel-name");
```
When publishing tokens, don't await the `channel.publish()` call. Ably rolls up acknowledgments and debounces them for efficiency, which means awaiting each publish would unnecessarily slow down your token stream. Messages are still published in the order that `publish()` is called, so delivery order is not affected.
```javascript
// ✅ Do this - publish without await for maximum throughput
for await (const event of stream) {
if (event.type === 'token') {
channel.publish('token', event.text);
}
}
// ❌ Don't do this - awaiting each publish reduces throughput
for await (const event of stream) {
if (event.type === 'token') {
await channel.publish('token', event.text);
}
}
```
```python
# ✅ Do this - publish without await for maximum throughput
async for event in stream:
if event['type'] == 'token':
message = Message(name='token', data=event['text'])
channel.publish(message)
# ❌ Don't do this - awaiting each publish reduces throughput
async for event in stream:
if event['type'] == 'token':
message = Message(name='token', data=event['text'])
await channel.publish(message)
```
```java
// ✅ Do this - publish without blocking for maximum throughput
for (Event event : stream) {
if (event.getType().equals("token")) {
channel.publish("token", event.getText());
}
}
// ❌ Don't do this - blocking on each publish reduces throughput
for (Event event : stream) {
if (event.getType().equals("token")) {
channel.publish("token", event.getText()).get(); // blocking call
}
}
```
This approach maximizes throughput while maintaining ordering guarantees, allowing you to stream tokens as fast as your AI model generates them.
## Streaming patterns
Ably is a pub/sub messaging platform, so you can structure your messages however works best for your application. Below are common patterns for streaming tokens, each showing both agent-side publishing and client-side subscription. Choose the approach that fits your use case, or create your own variation.
### Continuous token stream
For simple streaming scenarios such as live transcription, where all tokens are part of a continuous stream, simply publish each token as a message.
#### Publish tokens
```javascript
const channel = realtime.channels.get('your-channel-name');
// Example: stream returns events like { type: 'token', text: 'Hello' }
for await (const event of stream) {
if (event.type === 'token') {
channel.publish('token', event.text);
}
}
```
```python
channel = realtime.channels.get('your-channel-name')
# Example: stream returns events like { 'type': 'token', 'text': 'Hello' }
async for event in stream:
if event['type'] == 'token':
channel.publish('token', event['text'])
```
```java
Channel channel = realtime.channels.get("your-channel-name");
// Example: stream returns events like { type: 'token', text: 'Hello' }
for (Event event : stream) {
if (event.getType().equals("token")) {
channel.publish("token", event.getText());
}
}
```
#### Subscribe to tokens
```javascript
const channel = realtime.channels.get('your-channel-name');
// Subscribe to token messages
await channel.subscribe('token', (message) => {
const token = message.data;
console.log(token); // log each token as it arrives
});
```
```python
channel = realtime.channels.get('your-channel-name')
# Subscribe to token messages
def on_token(message):
token = message.data
print(token) # log each token as it arrives
await channel.subscribe('token', on_token)
```
```java
Channel channel = realtime.channels.get("your-channel-name");
// Subscribe to token messages
channel.subscribe("token", message -> {
String token = (String) message.data;
System.out.println(token); // log each token as it arrives
});
```
This pattern is simple and works well when you're displaying a single, continuous stream of tokens.
### Token stream with multiple responses
For applications with multiple responses, such as chat conversations, include a `responseId` in message [`extras`](https://ably.com/docs/messages.md#properties) to correlate tokens together that belong to the same response.
#### Publish tokens
```javascript
const channel = realtime.channels.get('your-channel-name');
// Example: stream returns events like { type: 'token', text: 'Hello', responseId: 'resp_abc123' }
for await (const event of stream) {
if (event.type === 'token') {
channel.publish({
name: 'token',
data: event.text,
extras: {
headers: {
responseId: event.responseId
}
}
});
}
}
```
```python
channel = realtime.channels.get('your-channel-name')
# Example: stream returns events like { 'type': 'token', 'text': 'Hello', 'responseId': 'resp_abc123' }
async for event in stream:
if event['type'] == 'token':
channel.publish(Message(
name='token',
data=event['text'],
extras={
'headers': {
'responseId': event['responseId']
}
}
))
```
```java
Channel channel = realtime.channels.get("your-channel-name");
// Example: stream returns events like { type: 'token', text: 'Hello', responseId: 'resp_abc123' }
for (Event event : stream) {
if (event.getType().equals("token")) {
JsonObject extras = new JsonObject();
JsonObject headers = new JsonObject();
headers.addProperty("responseId", event.getResponseId());
extras.add("headers", headers);
channel.publish(new Message("token", event.getText(), new MessageExtras(extras)));
}
}
```
#### Subscribe to tokens
Use the `responseId` header in message extras to correlate tokens. The `responseId` allows you to group tokens belonging to the same response and correctly handle token delivery for distinct responses, even when delivered concurrently.
```javascript
const channel = realtime.channels.get('your-channel-name');
// Track responses by ID
const responses = new Map();
await channel.subscribe('token', (message) => {
const token = message.data;
const responseId = message.extras?.headers?.responseId;
if (!responseId) {
console.warn('Token missing responseId');
return;
}
// Create an empty response
if (!responses.has(responseId)) {
responses.set(responseId, '');
}
// Append token to response
responses.set(responseId, responses.get(responseId) + token);
});
```
```python
channel = realtime.channels.get('your-channel-name')
# Track responses by ID
responses = {}
def on_token(message):
token = message.data
response_id = message.extras.get('headers', {}).get('responseId')
if not response_id:
print('Token missing responseId')
return
# Create an empty response
if response_id not in responses:
responses[response_id] = ''
# Append token to response
responses[response_id] += token
await channel.subscribe('token', on_token)
```
```java
Channel channel = realtime.channels.get("your-channel-name");
// Track responses by ID
Map responses = new ConcurrentHashMap<>();
channel.subscribe("token", message -> {
String token = (String) message.data;
JsonObject headers = message.extras.asJsonObject().get("headers").getAsJsonObject();
String responseId = headers != null ? headers.get("responseId").getAsString() : null;
if (responseId == null) {
System.err.println("Token missing responseId");
return;
}
// Create an empty response
responses.putIfAbsent(responseId, "");
// Append token to response
responses.put(responseId, responses.get(responseId) + token);
});
```
### Token stream with explicit start/stop events
In some cases, your AI model response stream may include explicit events to mark response boundaries. You can indicate the event type, such as a response start/stop event, using the Ably message [`name`](https://ably.com/docs/messages.md#properties).
#### Publish tokens
```javascript
const channel = realtime.channels.get('your-channel-name');
// Example: stream returns events like:
// { type: 'message_start', responseId: 'resp_abc123' }
// { type: 'message_delta', responseId: 'resp_abc123', text: 'Hello' }
// { type: 'message_stop', responseId: 'resp_abc123' }
for await (const event of stream) {
if (event.type === 'message_start') {
// Publish response start
channel.publish({
name: 'start',
extras: {
headers: {
responseId: event.responseId
}
}
});
} else if (event.type === 'message_delta') {
// Publish tokens
channel.publish({
name: 'token',
data: event.text,
extras: {
headers: {
responseId: event.responseId
}
}
});
} else if (event.type === 'message_stop') {
// Publish response stop
channel.publish({
name: 'stop',
extras: {
headers: {
responseId: event.responseId
}
}
});
}
}
```
```python
channel = realtime.channels.get('your-channel-name')
# Example: stream returns events like:
# { 'type': 'message_start', 'responseId': 'resp_abc123' }
# { 'type': 'message_delta', 'responseId': 'resp_abc123', 'text': 'Hello' }
# { 'type': 'message_stop', 'responseId': 'resp_abc123' }
async for event in stream:
if event['type'] == 'message_start':
# Publish response start
channel.publish(Message(
name='start',
extras={
'headers': {
'responseId': event['responseId']
}
}
))
elif event['type'] == 'message_delta':
# Publish tokens
channel.publish(Message(
name='token',
data=event['text'],
extras={
'headers': {
'responseId': event['responseId']
}
}
))
elif event['type'] == 'message_stop':
# Publish response stop
channel.publish(Message(
name='stop',
extras={
'headers': {
'responseId': event['responseId']
}
}
))
```
```java
Channel channel = realtime.channels.get("your-channel-name");
// Example: stream returns events like:
// { type: 'message_start', responseId: 'resp_abc123' }
// { type: 'message_delta', responseId: 'resp_abc123', text: 'Hello' }
// { type: 'message_stop', responseId: 'resp_abc123' }
for (Event event : stream) {
JsonObject extras = new JsonObject();
JsonObject headers = new JsonObject();
headers.addProperty("responseId", event.getResponseId());
extras.add("headers", headers);
if (event.getType().equals("message_start")) {
// Publish response start
channel.publish(new Message("start", null, new MessageExtras(extras)));
} else if (event.getType().equals("message_delta")) {
// Publish tokens
channel.publish(new Message("token", event.getText(), new MessageExtras(extras)));
} else if (event.getType().equals("message_stop")) {
// Publish response stop
channel.publish(new Message("stop", null, new MessageExtras(extras)));
}
}
```
#### Subscribe to tokens
Handle each event type to manage response lifecycle:
```javascript
const channel = realtime.channels.get('your-channel-name');
const responses = new Map();
// Handle response start
await channel.subscribe('start', (message) => {
const responseId = message.extras?.headers?.responseId;
responses.set(responseId, '');
});
// Handle tokens
await channel.subscribe('token', (message) => {
const responseId = message.extras?.headers?.responseId;
const token = message.data;
const currentText = responses.get(responseId) || '';
responses.set(responseId, currentText + token);
});
// Handle response stop
await channel.subscribe('stop', (message) => {
const responseId = message.extras?.headers?.responseId;
const finalText = responses.get(responseId);
console.log('Response complete:', finalText);
});
```
```python
channel = realtime.channels.get('your-channel-name')
responses = {}
# Handle response start
def on_start(message):
response_id = message.extras.get('headers', {}).get('responseId')
responses[response_id] = ''
await channel.subscribe('start', on_start)
# Handle tokens
def on_token(message):
response_id = message.extras.get('headers', {}).get('responseId')
token = message.data
current_text = responses.get(response_id, '')
responses[response_id] = current_text + token
await channel.subscribe('token', on_token)
# Handle response stop
def on_stop(message):
response_id = message.extras.get('headers', {}).get('responseId')
final_text = responses.get(response_id)
print('Response complete:', final_text)
await channel.subscribe('stop', on_stop)
```
```java
Channel channel = realtime.channels.get("your-channel-name");
Map responses = new ConcurrentHashMap<>();
// Handle response start
channel.subscribe("start", message -> {
JsonObject headers = message.extras.asJsonObject().get("headers").getAsJsonObject();
String responseId = headers.get("responseId").getAsString();
responses.put(responseId, "");
});
// Handle tokens
channel.subscribe("token", message -> {
JsonObject headers = message.extras.asJsonObject().get("headers").getAsJsonObject();
String responseId = headers.get("responseId").getAsString();
String token = (String) message.data;
String currentText = responses.getOrDefault(responseId, "");
responses.put(responseId, currentText + token);
});
// Handle response stop
channel.subscribe("stop", message -> {
JsonObject headers = message.extras.asJsonObject().get("headers").getAsJsonObject();
String responseId = headers.get("responseId").getAsString();
String finalText = responses.get(responseId);
System.out.println("Response complete: " + finalText);
});
```
## Client hydration
When clients connect or reconnect, such as after a page refresh, they often need to catch up on tokens that were published while they were offline or before they joined. Ably provides several approaches to hydrate client state depending on your application's requirements.
### Using rewind for recent history
The simplest approach is to use Ably's [rewind](https://ably.com/docs/channels/options/rewind.md) channel option to attach to the channel at some point in the recent past, and automatically receive all tokens since that point:
```javascript
// Use rewind to receive recent historical messages
const channel = realtime.channels.get('your-channel-name', {
params: { rewind: '2m' } // or rewind: 100 for message count
});
// Subscribe to receive both recent historical and live messages,
// which are delivered in order to the subscription
await channel.subscribe('token', (message) => {
const token = message.data;
// Process tokens from both recent history and live stream
console.log('Token received:', token);
});
```
```python
# Use rewind to receive recent historical messages
channel = realtime.channels.get('your-channel-name', params={'rewind': '2m'}) # or rewind: 100 for message count
# Subscribe to receive both recent historical and live messages,
# which are delivered in order to the subscription
def on_token(message):
token = message.data
# Process tokens from both recent history and live stream
print('Token received:', token)
await channel.subscribe('token', on_token)
```
```java
// Use rewind to receive recent historical messages
ChannelOptions options = new ChannelOptions();
options.params = Map.of("rewind", "2m"); // or rewind: 100 for message count
Channel channel = realtime.channels.get("your-channel-name", options);
// Subscribe to receive both recent historical and live messages,
// which are delivered in order to the subscription
channel.subscribe("token", message -> {
String token = (String) message.data;
// Process tokens from both recent history and live stream
System.out.println("Token received: " + token);
});
```
Rewind supports two formats:
- **Time-based**: Use a time interval like `'30s'` or `'2m'` to retrieve messages from that time period
- **Count-based**: Use a number like `50` or `100` to retrieve the most recent N messages (maximum 100)
By default, rewind is limited to the last 2 minutes of messages. This is usually sufficient for scenarios where clients need only recent context, such as for continuous token streaming, or when the response stream from a given model request does not exceed 2 minutes. If you need more than 2 minutes of history, see [Using history for longer persistence](#history).
### Using history for older messages
For applications that need to retrieve tokens beyond the 2-minute rewind window, enable [persistence](https://ably.com/docs/storage-history/storage.md#all-message-persistence) on your channel. Use [channel history](https://ably.com/docs/storage-history/history.md) with the [`untilAttach` option](https://ably.com/docs/storage-history/history.md#continuous-history) to paginate back through history to obtain historical tokens, while preserving continuity with the delivery of live tokens:
```javascript
// Use a channel in a namespace called 'persisted', which has persistence enabled
const channel = realtime.channels.get('persisted:your-channel-name');
let response = '';
// Subscribe to live messages (implicitly attaches the channel)
await channel.subscribe('token', (message) => {
// Append the token to the end of the response
response += message.data;
});
// Fetch history up until the point of attachment
let page = await channel.history({ untilAttach: true });
// Paginate backwards through history
while (page) {
// Messages are newest-first, so prepend them to response
for (const message of page.items) {
response = message.data + response;
}
// Move to next page if available
page = page.hasNext() ? await page.next() : null;
}
```
```python
# Use a channel in a namespace called 'persisted', which has persistence enabled
channel = realtime.channels.get('persisted:your-channel-name')
response = ''
# Subscribe to live messages (implicitly attaches the channel)
def on_token(message):
global response
# Append the token to the end of the response
response += message.data
await channel.subscribe('token', on_token)
# Fetch history up until the point of attachment
page = await channel.history(until_attach=True)
# Paginate backwards through history
while page:
# Messages are newest-first, so prepend them to response
for message in page.items:
response = message.data + response
# Move to next page if available
page = await page.next() if page.has_next() else None
```
```java
// Use a channel in a namespace called 'persisted', which has persistence enabled
Channel channel = realtime.channels.get("persisted:your-channel-name");
StringBuilder response = new StringBuilder();
// Subscribe to live messages (implicitly attaches the channel)
channel.subscribe("token", message -> {
// Append the token to the end of the response
response.append((String) message.data);
});
// Fetch history up until the point of attachment
PaginatedResult page = channel.history(new Param("untilAttach", "true"));
// Paginate backwards through history
while (page != null) {
// Messages are newest-first, so prepend them to response
for (Message message : page.items()) {
response.insert(0, (String) message.data);
}
// Move to next page if available
page = page.hasNext() ? page.next() : null;
}
```
### Hydrating an in-progress response
A common pattern is to persist complete model responses in your database while using Ably for live token delivery of the in-progress response.
The client loads completed responses from your database, then reaches back into Ably channel history until it encounters a token for a response it's already loaded.
You can retrieve partial history using either the [rewind](#rewind) or [history](#history) pattern.
#### Hydrate using rewind
Load completed responses from your database, then use rewind to catch up on any in-progress responses, skipping any tokens that belong to a response that was already loaded:
```javascript
// Load completed responses from database
const completedResponses = await loadResponsesFromDatabase();
// Use rewind to receive recent historical messages
const channel = realtime.channels.get('your-channel-name', {
params: { rewind: '2m' }
});
// Track in progress responses by ID
const inProgressResponses = new Map();
// Subscribe to receive both recent historical and live messages,
// which are delivered in order to the subscription
await channel.subscribe('token', (message) => {
const token = message.data;
const responseId = message.extras?.headers?.responseId;
if (!responseId) {
console.warn('Token missing responseId');
return;
}
// Skip tokens for responses already hydrated from database
if (completedResponses.has(responseId)) {
return;
}
// Create an empty in-progress response
if (!inProgressResponses.has(responseId)) {
inProgressResponses.set(responseId, '');
}
// Append tokens for new responses
inProgressResponses.set(responseId, inProgressResponses.get(responseId) + token);
});
```
```python
# Load completed responses from database
completed_responses = await load_responses_from_database()
# Use rewind to receive recent historical messages
channel = realtime.channels.get('your-channel-name', params={'rewind': '2m'})
# Track in progress responses by ID
in_progress_responses = {}
# Subscribe to receive both recent historical and live messages,
# which are delivered in order to the subscription
def on_token(message):
token = message.data
response_id = message.extras.get('headers', {}).get('responseId')
if not response_id:
print('Token missing responseId')
return
# Skip tokens for responses already hydrated from database
if response_id in completed_responses:
return
# Create an empty in-progress response
if response_id not in in_progress_responses:
in_progress_responses[response_id] = ''
# Append tokens for new responses
in_progress_responses[response_id] += token
await channel.subscribe('token', on_token)
```
```java
// Load completed responses from database
Set completedResponses = loadResponsesFromDatabase();
// Use rewind to receive recent historical messages
ChannelOptions options = new ChannelOptions();
options.params = Map.of("rewind", "2m");
Channel channel = realtime.channels.get("your-channel-name", options);
// Track in progress responses by ID
Map inProgressResponses = new ConcurrentHashMap<>();
// Subscribe to receive both recent historical and live messages,
// which are delivered in order to the subscription
channel.subscribe("token", message -> {
String token = (String) message.data;
JsonObject headers = message.extras.asJsonObject().get("headers").getAsJsonObject();
String responseId = headers != null ? headers.get("responseId").getAsString() : null;
if (responseId == null) {
System.err.println("Token missing responseId");
return;
}
// Skip tokens for responses already hydrated from database
if (completedResponses.contains(responseId)) {
return;
}
// Create an empty in-progress response
inProgressResponses.putIfAbsent(responseId, "");
// Append tokens for new responses
inProgressResponses.put(responseId, inProgressResponses.get(responseId) + token);
});
```
#### Hydrate using history
Load completed responses from your database, then paginate backwards through history to catch up on in-progress responses until you reach a token that belongs to a response you've already loaded:
```javascript
// Load completed responses from database
const completedResponses = await loadResponsesFromDatabase();
// Use a channel in a namespace called 'persisted', which has persistence enabled
const channel = realtime.channels.get('persisted:your-channel-name');
// Track in progress responses by ID
const inProgressResponses = new Map();
// Subscribe to live tokens (implicitly attaches)
await channel.subscribe('token', (message) => {
const token = message.data;
const responseId = message.extras?.headers?.responseId;
if (!responseId) {
console.warn('Token missing responseId');
return;
}
// Skip tokens for responses already hydrated from database
if (completedResponses.has(responseId)) {
return;
}
// Create an empty in-progress response
if (!inProgressResponses.has(responseId)) {
inProgressResponses.set(responseId, '');
}
// Append live tokens for in-progress responses
inProgressResponses.set(responseId, inProgressResponses.get(responseId) + token);
});
// Paginate backwards through history until we encounter a hydrated response
let page = await channel.history({ untilAttach: true });
// Paginate backwards through history
let done = false;
while (page && !done) {
// Messages are newest-first, so prepend them to response
for (const message of page.items) {
const token = message.data;
const responseId = message.extras?.headers?.responseId;
// Stop when we reach a response already loaded from database
if (completedResponses.has(responseId)) {
done = true;
break;
}
// Create an empty in-progress response
if (!inProgressResponses.has(responseId)) {
inProgressResponses.set(responseId, '');
}
// Prepend historical tokens for in-progress responses
inProgressResponses.set(responseId, token + inProgressResponses.get(responseId));
}
// Move to next page if available
page = page.hasNext() ? await page.next() : null;
}
```
```python
# Load completed responses from database
completed_responses = await load_responses_from_database()
# Use a channel in a namespace called 'persisted', which has persistence enabled
channel = realtime.channels.get('persisted:your-channel-name')
# Track in progress responses by ID
in_progress_responses = {}
# Subscribe to live tokens (implicitly attaches)
def on_token(message):
token = message.data
response_id = message.extras.get('headers', {}).get('responseId')
if not response_id:
print('Token missing responseId')
return
# Skip tokens for responses already hydrated from database
if response_id in completed_responses:
return
# Create an empty in-progress response
if response_id not in in_progress_responses:
in_progress_responses[response_id] = ''
# Append live tokens for in-progress responses
in_progress_responses[response_id] += token
await channel.subscribe('token', on_token)
# Paginate backwards through history until we encounter a hydrated response
page = await channel.history(until_attach=True)
# Paginate backwards through history
done = False
while page and not done:
# Messages are newest-first, so prepend them to response
for message in page.items:
token = message.data
response_id = message.extras.get('headers', {}).get('responseId')
# Stop when we reach a response already loaded from database
if response_id in completed_responses:
done = True
break
# Create an empty in-progress response
if response_id not in in_progress_responses:
in_progress_responses[response_id] = ''
# Prepend historical tokens for in-progress responses
in_progress_responses[response_id] = token + in_progress_responses[response_id]
# Move to next page if available
page = await page.next() if page.has_next() else None
```
```java
// Load completed responses from database
Set completedResponses = loadResponsesFromDatabase();
// Use a channel in a namespace called 'persisted', which has persistence enabled
Channel channel = realtime.channels.get("persisted:your-channel-name");
// Track in progress responses by ID
Map inProgressResponses = new ConcurrentHashMap<>();
// Subscribe to live tokens (implicitly attaches)
channel.subscribe("token", message -> {
String token = (String) message.data;
JsonObject headers = message.extras.asJsonObject().get("headers").getAsJsonObject();
String responseId = headers != null ? headers.get("responseId").getAsString() : null;
if (responseId == null) {
System.err.println("Token missing responseId");
return;
}
// Skip tokens for responses already hydrated from database
if (completedResponses.contains(responseId)) {
return;
}
// Create an empty in-progress response
inProgressResponses.putIfAbsent(responseId, "");
// Append live tokens for in-progress responses
inProgressResponses.put(responseId, inProgressResponses.get(responseId) + token);
});
// Paginate backwards through history until we encounter a hydrated response
PaginatedResult page = channel.history(new Param("untilAttach", "true"));
// Paginate backwards through history
boolean done = false;
while (page != null && !done) {
// Messages are newest-first, so prepend them to response
for (Message message : page.items()) {
String token = (String) message.data;
JsonObject headers = message.extras.asJsonObject().get("headers").getAsJsonObject();
String responseId = headers != null ? headers.get("responseId").getAsString() : null;
// Stop when we reach a response already loaded from database
if (completedResponses.contains(responseId)) {
done = true;
break;
}
// Create an empty in-progress response
inProgressResponses.putIfAbsent(responseId, "");
// Prepend historical tokens for in-progress responses
inProgressResponses.put(responseId, token + inProgressResponses.get(responseId));
}
// Move to next page if available
page = page.hasNext() ? page.next() : null;
}
```