# Completion and cancellation AI responses streamed using the [message-per-response](https://ably.com/docs/ai-transport/token-streaming/message-per-response.md) or [message-per-token](https://ably.com/docs/ai-transport/token-streaming/message-per-token.md) pattern do not require explicit completion signals to function. Subscribers receive tokens as they arrive and can render them progressively. However, some applications benefit from explicitly signalling when a response is complete, or allowing users to cancel an in-progress response. ## Benefits of completion and cancellation signals Explicit completion and cancellation signals are useful when your application needs to: - Detect whether a response is still in progress after reconnection, so clients can distinguish between a completed response and one that is still streaming - Finalize UI state when a response ends, such as removing typing indicators or enabling input controls - Allow users to abort a response mid-stream, stopping generation and saving compute resources - Coordinate multiple content parts within a single response, where downstream logic depends on knowing when each part is finished ## Signal completion Use [operation metadata](https://ably.com/docs/messages/updates-deletes.md#append-operation-metadata) to signal that a content part or response is complete. Operation metadata is a set of key-value pairs carried on each append or update operation. Subscribers can inspect this metadata to determine the current phase of a message. ### Content-part completion When streaming content using the [message-per-response](https://ably.com/docs/ai-transport/token-streaming/message-per-response.md) pattern, signal that a content part is complete by appending an empty string with a metadata marker. The empty append does not change the message's data, but the metadata signals to subscribers that no more content follows for this message. This keeps the entire content lifecycle (create, stream, done) within a single Ably message: #### Javascript ``` const channel = realtime.channels.get('ai:your-channel-name'); // Publish initial message const { serials: [msgSerial] } = await channel.publish({ name: 'response', data: '' }); // Stream tokens for await (const event of stream) { if (event.type === 'token') { channel.appendMessage({ serial: msgSerial, data: event.text }, { metadata: { phase: 'streaming' } }); } } // Signal content-part completion with an empty append channel.appendMessage({ serial: msgSerial, data: '' }, { metadata: { phase: 'done' } }); ``` #### Python ``` channel = realtime.channels.get('ai:your-channel-name') # Publish initial message message = Message(name='response', data='') result = await channel.publish(message) msg_serial = result.serials[0] # Stream tokens async for event in stream: if event['type'] == 'token': channel.append_message( serial=msg_serial, data=event['text'], metadata={'phase': 'streaming'} ) # Signal content-part completion with an empty append channel.append_message( serial=msg_serial, data='', metadata={'phase': 'done'} ) ``` #### Java ``` Channel channel = realtime.channels.get("ai:your-channel-name"); // Publish initial message CompletableFuture publishFuture = new CompletableFuture<>(); channel.publish("response", "", new Callback() { @Override public void onSuccess(PublishResult result) { publishFuture.complete(result); } @Override public void onError(ErrorInfo reason) { publishFuture.completeExceptionally(AblyException.fromErrorInfo(reason)); } }); String msgSerial = publishFuture.get().serials[0]; // Stream tokens for (Event event : stream) { if (event.getType().equals("token")) { MessageMetadata metadata = new MessageMetadata(); metadata.put("phase", "streaming"); channel.appendMessage(msgSerial, event.getText(), metadata); } } // Signal content-part completion with an empty append MessageMetadata metadata = new MessageMetadata(); metadata.put("phase", "done"); channel.appendMessage(msgSerial, "", metadata); ``` ### Response-level completion A single AI response may span multiple content parts, each represented as a separate Ably message with its own stream of appends. To signal that the entire response is complete, publish a discrete message after all content parts are finished. Subscribers can use this as a cue to finalize the response in the UI. #### Javascript ``` // After all content parts are done, signal response-level completion await channel.publish({ name: 'response-end', data: '', extras: { headers: { responseId: 'resp_abc123' } } }); ``` #### Python ``` # After all content parts are done, signal response-level completion await channel.publish(Message( name='response-end', data='', extras={ 'headers': { 'responseId': 'resp_abc123' } } )) ``` #### Java ``` // After all content parts are done, signal response-level completion JsonObject extras = new JsonObject(); JsonObject headers = new JsonObject(); headers.addProperty("responseId", "resp_abc123"); extras.add("headers", headers); channel.publish(new Message("response-end", "", new MessageExtras(extras))); ``` ### Detect completion from history When [hydrating client state](https://ably.com/docs/ai-transport/token-streaming/message-per-response.md#hydration) from history, inspect `version.metadata` on each message to determine whether a content part was fully completed or is still in progress. If the most recent operation's metadata carries your completion marker, the content part is done. If it carries a streaming marker or no marker, the stream may still be active. #### Javascript ``` const channel = realtime.channels.get('ai:your-channel-name'); await channel.subscribe((message) => { // ...handle message actions as normal... }); let page = await channel.history({ untilAttach: true }); while (page) { for (const message of page.items) { const phase = message.version?.metadata?.phase; if (phase === 'done') { // Content part is complete, render as final } else { // Content part may still be streaming, listen for live appends } } page = page.hasNext() ? await page.next() : null; } ``` #### Python ``` channel = realtime.channels.get('ai:your-channel-name') await channel.subscribe(on_message) page = await channel.history(until_attach=True) while page: for message in page.items: phase = getattr(message.version, 'metadata', {}).get('phase') if phase == 'done': # Content part is complete, render as final pass else: # Content part may still be streaming, listen for live appends pass page = await page.next() if page.has_next() else None ``` #### Java ``` Channel channel = realtime.channels.get("ai:your-channel-name"); channel.subscribe(message -> { /* handle message actions as normal */ }); PaginatedResult page = channel.history(new Param("untilAttach", "true")); while (page != null) { for (Message message : page.items()) { String phase = message.version != null && message.version.metadata != null ? message.version.metadata.get("phase") : null; if ("done".equals(phase)) { // Content part is complete, render as final } else { // Content part may still be streaming, listen for live appends } } page = page.hasNext() ? page.next() : null; } ``` ## Cancel a response Cancellation allows users to stop an in-progress response. The subscriber publishes a cancellation message on the channel, and the publisher stops generating and flushes any pending operations. ### How it works 1. The subscriber publishes a cancellation message on the channel with a response ID identifying the response to cancel. 2. The publisher receives the cancellation message, stops generating, and flushes any pending append operations. 3. The publisher optionally publishes a confirmation message to signal clean shutdown to other subscribers. ### Publish a cancellation request The subscriber sends a cancellation message with a `responseId` in the message [extras](https://ably.com/docs/messages.md#properties) to identify which response to cancel: #### Javascript ``` const channel = realtime.channels.get('ai:your-channel-name'); // Send cancellation request for a specific response await channel.publish({ name: 'cancel', data: '', extras: { headers: { responseId: 'resp_abc123' } } }); ``` #### Python ``` channel = realtime.channels.get('ai:your-channel-name') # Send cancellation request for a specific response await channel.publish(Message( name='cancel', data='', extras={ 'headers': { 'responseId': 'resp_abc123' } } )) ``` #### Java ``` Channel channel = realtime.channels.get("ai:your-channel-name"); // Send cancellation request for a specific response JsonObject extras = new JsonObject(); JsonObject headers = new JsonObject(); headers.addProperty("responseId", "resp_abc123"); extras.add("headers", headers); channel.publish(new Message("cancel", "", new MessageExtras(extras))); ``` ### Handle cancellation The publisher subscribes for cancellation messages and stops generation when one arrives. After stopping, flush any pending append operations before optionally publishing a confirmation message: #### Javascript ``` const channel = realtime.channels.get('ai:your-channel-name'); // Track pending appends for flushing const pendingAppends = []; // Listen for cancellation requests await channel.subscribe('cancel', async (message) => { const responseId = message.extras?.headers?.responseId; // Stop generation for the matching response stopGeneration(responseId); // Flush any pending appends before confirming await Promise.all(pendingAppends); // Optionally confirm cancellation to all subscribers await channel.publish({ name: 'cancelled', data: '', extras: { headers: { responseId } } }); }); ``` #### Python ``` channel = realtime.channels.get('ai:your-channel-name') # Track pending appends for flushing pending_appends = [] # Listen for cancellation requests async def on_cancel(message): response_id = message.extras.get('headers', {}).get('responseId') # Stop generation for the matching response stop_generation(response_id) # Flush any pending appends before confirming await asyncio.gather(*pending_appends) # Optionally confirm cancellation to all subscribers await channel.publish(Message( name='cancelled', data='', extras={ 'headers': { 'responseId': response_id } } )) await channel.subscribe('cancel', on_cancel) ``` #### Java ``` Channel channel = realtime.channels.get("ai:your-channel-name"); // Listen for cancellation requests channel.subscribe("cancel", message -> { JsonObject headers = message.extras.asJsonObject().get("headers").getAsJsonObject(); String responseId = headers != null ? headers.get("responseId").getAsString() : null; // Stop generation for the matching response stopGeneration(responseId); // Flush any pending appends before confirming flushPendingAppends(); // Optionally confirm cancellation to all subscribers JsonObject confirmExtras = new JsonObject(); JsonObject confirmHeaders = new JsonObject(); confirmHeaders.addProperty("responseId", responseId); confirmExtras.add("headers", confirmHeaders); channel.publish(new Message("cancelled", "", new MessageExtras(confirmExtras))); }); ```