# Guide: Stream OpenAI responses using the message-per-response pattern This guide shows you how to stream AI responses from OpenAI's [Responses API](https://platform.openai.com/docs/api-reference/responses) over Ably using the [message-per-response pattern](https://ably.com/docs/ai-transport/token-streaming/message-per-response.md). Specifically, it appends each response token to a single Ably message, creating a complete AI response that grows incrementally while delivering tokens in realtime. Using Ably to distribute tokens from the OpenAI SDK enables you to broadcast AI responses to thousands of concurrent subscribers with reliable message delivery and ordering guarantees. This approach stores each complete response as a single message in channel history, making it easy to retrieve conversation history without processing thousands of individual token messages. ## Prerequisites To follow this guide, you need: - Node.js 20 or higher - An OpenAI API key - An Ably API key Useful links: - [OpenAI developer quickstart](https://platform.openai.com/docs/quickstart) - [Ably JavaScript SDK getting started](https://ably.com/docs/getting-started/javascript.md) Create a new NPM package, which will contain the publisher and subscriber code: ```shell mkdir ably-openai-example && cd ably-openai-example npm init -y ``` Install the required packages using NPM: ```shell npm install openai@^4 ably@^2 ``` Export your OpenAI API key to the environment, which will be used later in the guide by the OpenAI SDK: ```shell export OPENAI_API_KEY="your_api_key_here" ``` ## Step 1: Enable message appends Message append functionality requires "Message annotations, updates, deletes and appends" to be enabled in a [channel rule](https://ably.com/docs/channels.md#rules) associated with the channel. To enable the channel rule: 1. Go to the [Ably dashboard](https://www.ably.com/dashboard) and select your app. 2. Navigate to the "Configuration" > "Rules" section from the left-hand navigation bar. 3. Choose "Add new rule". 4. Enter a channel name or namespace pattern (e.g. `ai` for all channels starting with `ai:`). 5. Select the "Message annotations, updates, deletes and appends" option from the list. 6. Click "Create channel rule". The examples in this guide use the `ai:` namespace prefix, which assumes you have configured the rule for `ai:*`. ## Step 2: Get a streamed response from OpenAI Initialize an OpenAI client and use the [Responses API](https://platform.openai.com/docs/api-reference/responses) to stream model output as a series of events. Create a new file `publisher.mjs` with the following contents: ```javascript import OpenAI from 'openai'; // Initialize OpenAI client const openai = new OpenAI(); // Process each streaming event async function processEvent(event) { console.log(JSON.stringify(event)); // This function is updated in the next sections } // Create streaming response from OpenAI async function streamOpenAIResponse(prompt) { const stream = await openai.responses.create({ model: "gpt-5", input: prompt, stream: true, }); // Iterate through streaming events for await (const event of stream) { await processEvent(event); } } // Usage example streamOpenAIResponse("Tell me a short joke"); ``` ### Understand OpenAI streaming events OpenAI's Responses API [streams](https://platform.openai.com/docs/guides/streaming-responses) model output as a series of events when you set `stream: true`. Each streamed event includes a `type` property which describes the [event type](https://platform.openai.com/docs/api-reference/responses-streaming). A complete text response can be constructed from the following event types: - [`response.created`](https://platform.openai.com/docs/api-reference/responses-streaming/response/created): Signals the start of a response. Contains `response.id` to correlate subsequent events. - [`response.output_item.added`](https://platform.openai.com/docs/api-reference/responses-streaming/response/output_item/added): Indicates a new output item. If `item.type === "message"` the item contains model response text; other types may be specified, such as `"reasoning"` for internal reasoning tokens. The `output_index` indicates the position of this item in the response's [`output`](https://platform.openai.com/docs/api-reference/responses-streaming/response/completed#responses_streaming-response-completed-response-output) array. - [`response.content_part.added`](https://platform.openai.com/docs/api-reference/responses-streaming/response/content_part/added): Indicates a new content part within an output item. If `part.type === "output_text"` the part contains model response text; other types may be specified, such as `"reasoning_text"` for internal reasoning tokens. The `content_index` indicates the position of this item in the output items's [`content`](https://platform.openai.com/docs/api-reference/responses-streaming/response/completed#responses_streaming-response-completed-response-output-output_message-content) array. - [`response.output_text.delta`](https://platform.openai.com/docs/api-reference/responses-streaming/response/output_text/delta): Contains a single token in the `delta` field. Use the `item_id`, `output_index`, and `content_index` to correlate tokens relating to a specific content part. - [`response.content_part.done`](https://platform.openai.com/docs/api-reference/responses-streaming/response/content_part/done): Signals completion of a content part. Contains the complete `part` object with full text, along with `item_id`, `output_index`, and `content_index`. - [`response.output_item.done`](https://platform.openai.com/docs/api-reference/responses-streaming/response/output_item/done): Signals completion of an output item. Contains the complete `item` object and `output_index`. - [`response.completed`](https://platform.openai.com/docs/api-reference/responses-streaming/response/completed): Signals the end of the response. Contains the complete `response` object. The following example shows the event sequence received when streaming a response: ```json // 1. Response starts {"type":"response.created","response":{"id":"resp_abc123","status":"in_progress"}} // 2. First output item (reasoning) is added {"type":"response.output_item.added","output_index":0,"item":{"id":"rs_456","type":"reasoning"}} {"type":"response.output_item.done","output_index":0,"item":{"id":"rs_456","type":"reasoning"}} // 3. Second output item (message) is added {"type":"response.output_item.added","output_index":1,"item":{"id":"msg_789","type":"message"}} {"type":"response.content_part.added","item_id":"msg_789","output_index":1,"content_index":0} // 4. Text tokens stream in as delta events {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":"Why"} {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":" don"} {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":"'t"} {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":" scientists"} {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":" trust"} {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":" atoms"} {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":"?"} {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":" Because"} {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":" they"} {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":" make"} {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":" up"} {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":" everything"} {"type":"response.output_text.delta","item_id":"msg_789","output_index":1,"content_index":0,"delta":"."} // 5. Content part and output item complete {"type":"response.content_part.done","item_id":"msg_789","output_index":1,"content_index":0,"part":{"type":"output_text","text":"Why don't scientists trust atoms? Because they make up everything."}} {"type":"response.output_item.done","output_index":1,"item":{"id":"msg_789","type":"message","status":"completed","content":[{"type":"output_text","text":"Why don't scientists trust atoms? Because they make up everything."}]}} // 6. Response completes {"type":"response.completed","response":{"id":"resp_abc123","status":"completed","output":[{"id":"rs_456","type":"reasoning"},{"id":"msg_789","type":"message","status":"completed","content":[{"type":"output_text","text":"Why don't scientists trust atoms? Because they make up everything."}]}]}} ``` ## Step 3: Publish streaming tokens to Ably Publish OpenAI streaming events to Ably using message appends to reliably and scalably distribute them to subscribers. Each AI response is stored as a single Ably message that grows as tokens are appended. ### Initialize the Ably client Add the Ably client initialization to your `publisher.mjs` file: ```javascript import Ably from 'ably'; // Initialize Ably Realtime client const realtime = new Ably.Realtime({ key: 'your-api-key', echoMessages: false }); // Create a channel for publishing streamed AI responses const channel = realtime.channels.get('ai:your-channel-name'); ``` The Ably Realtime client maintains a persistent connection to the Ably service, which allows you to publish tokens at high message rates with low latency. ### Publish initial message and append tokens When a new response begins, publish an initial message to create it. Ably assigns a [`serial`](https://ably.com/docs/messages.md#properties) identifier to the message. Use this `serial` to append each token to the message as it arrives from the OpenAI model. Update your `publisher.mjs` file to publish the initial message and append tokens: ```javascript // Track state across events let msgSerial = null; let messageItemId = null; // Process each streaming event and publish to Ably async function processEvent(event) { switch (event.type) { case 'response.created': // Publish initial empty message when response starts const result = await channel.publish({ name: 'response', data: '' }); // Capture the message serial for appending tokens msgSerial = result.serials[0]; break; case 'response.output_item.added': // Capture message item ID when a message output item is added if (event.item.type === 'message') { messageItemId = event.item.id; } break; case 'response.output_text.delta': // Append tokens from message output items only if (event.item_id === messageItemId && msgSerial) { channel.appendMessage({ serial: msgSerial, data: event.delta }); } break; case 'response.completed': console.log('Stream completed!'); break; } } ``` This implementation: - Publishes an initial empty message when the response begins and captures the `serial` - Filters for `response.output_text.delta` events from `message` type output items - Appends each token to the original message Run the publisher to see tokens streaming to Ably: ```shell node publisher.mjs ``` ## Step 4: Subscribe to streaming tokens Create a subscriber that receives the streaming tokens from Ably and reconstructs the response in realtime. Create a new file `subscriber.mjs` with the following contents: ```javascript import Ably from 'ably'; // Initialize Ably Realtime client const realtime = new Ably.Realtime({ key: 'your-api-key' }); // Get the same channel used by the publisher const channel = realtime.channels.get('ai:your-channel-name'); // Track responses by message serial const responses = new Map(); // Subscribe to receive messages await channel.subscribe((message) => { switch (message.action) { case 'message.create': // New response started console.log('\n[Response started]', message.serial); responses.set(message.serial, message.data); break; case 'message.append': // Append token to existing response const current = responses.get(message.serial) || ''; responses.set(message.serial, current + message.data); // Display token as it arrives process.stdout.write(message.data); break; case 'message.update': // Replace entire response content responses.set(message.serial, message.data); console.log('\n[Response updated with full content]'); break; } }); console.log('Subscriber ready, waiting for tokens...'); ``` Subscribers receive different message actions depending on when they join and how they're retrieving messages: - `message.create`: Indicates a new response has started (i.e. a new message was created). The message `data` contains the initial content (often empty or the first token). Store this as the beginning of a new response using `serial` as the identifier. - `message.append`: Contains a single token fragment to append. The message `data` contains only the new token, not the full concatenated response. Append this token to the existing response identified by `serial`. - `message.update`: Contains the whole response up to that point. The message `data` contains the full concatenated text so far. Replace the entire response content with this data for the message identified by `serial`. This action occurs when the channel needs to resynchronize the full message state, such as after a client [resumes](https://ably.com/docs/connect/states.md#resume) from a transient disconnection. Run the subscriber in a separate terminal: ```shell node subscriber.mjs ``` With the subscriber running, run the publisher in another terminal. The tokens stream in realtime as the OpenAI model generates them. ## Step 5: Stream with multiple publishers and subscribers Ably's [channel-oriented sessions](https://ably.com/docs/ai-transport/sessions-identity.md#connection-oriented-vs-channel-oriented-sessions) enables multiple AI agents to publish responses and multiple users to receive them on a single channel simultaneously. Ably handles message delivery to all participants, eliminating the need to implement routing logic or manage state synchronization across connections. ### Broadcasting to multiple subscribers Each subscriber receives the complete stream of tokens independently, enabling you to build collaborative experiences or multi-device applications. Run a subscriber in multiple separate terminals: ```shell # Terminal 1 node subscriber.mjs # Terminal 2 node subscriber.mjs # Terminal 3 node subscriber.mjs ``` All subscribers receive the same stream of tokens in realtime. ### Publishing concurrent responses Multiple publishers can stream different responses concurrently on the same [channel](https://ably.com/docs/channels.md). Each response is a distinct message with its own unique `serial` identifier, so tokens from different responses are isolated to distinct messages and don't interfere with each other. To demonstrate this, run a publisher in multiple separate terminals: ```shell # Terminal 1 node publisher.mjs # Terminal 2 node publisher.mjs # Terminal 3 node publisher.mjs ``` All running subscribers receive tokens from all responses concurrently. Each subscriber correctly reconstructs each response separately using the `serial` to correlate tokens. ## Step 6: Retrieve complete responses from history One key advantage of the message-per-response pattern is that each complete AI response is stored as a single message in channel history. This makes it efficient to retrieve conversation history without processing thousands of individual token messages. Use Ably's [rewind](https://ably.com/docs/channels/options/rewind.md) channel option to attach to the channel at some point in the recent past and automatically receive complete responses from history. Historical messages are delivered as `message.update` events containing the complete concatenated response, which then seamlessly transition to live `message.append` events for any ongoing responses: ```javascript // Use rewind to receive recent historical messages const channel = realtime.channels.get('ai:your-channel-name', { params: { rewind: '2m' } // Retrieve messages from the last 2 minutes }); const responses = new Map(); await channel.subscribe((message) => { switch (message.action) { case 'message.create': responses.set(message.serial, message.data); break; case 'message.append': const current = responses.get(message.serial) || ''; responses.set(message.serial, current + message.data); process.stdout.write(message.data); break; case 'message.update': // Historical messages contain full concatenated response responses.set(message.serial, message.data); console.log('\n[Historical response]:', message.data); break; } }); ``` ## Next steps - Learn more about the [message-per-response pattern](https://ably.com/docs/ai-transport/token-streaming/message-per-response.md) used in this guide - Learn about [client hydration strategies](https://ably.com/docs/ai-transport/token-streaming/message-per-response.md#hydration) for handling late joiners and reconnections - Understand [sessions and identity](https://ably.com/docs/ai-transport/sessions-identity.md) in AI enabled applications - Explore the [message-per-token pattern](https://ably.com/docs/ai-transport/token-streaming/message-per-token.md) for explicit control over individual token messages