# User input User input enables users to send prompts and requests to AI agents over Ably channels. The agent subscribes to a channel to receive user messages, processes them, and sends responses back. This pattern uses [Ably Pub/Sub](https://ably.com/docs/basics.md) for realtime, bi-directional communication between users and agents. User input works alongside [token streaming](https://ably.com/docs/ai-transport/token-streaming.md) patterns to create complete conversational AI experiences. While token streaming handles agent-to-user output, user input handles user-to-agent prompts. ## How it works User input follows a channel-based pattern where both users and agents connect to a shared channel: 1. The agent subscribes to the channel to listen for user messages. 2. The user publishes a message containing their prompt. 3. The agent receives the message, processes it, and generates a response. 4. The agent publishes the response back to the channel, correlating it to the original input. This decoupled approach means agents don't need to manage persistent connections to individual users. Instead, they subscribe to channels and respond to messages as they arrive. ## Identify the user Agents need to verify that incoming messages are from legitimate users. Use [identified clients](https://ably.com/docs/ai-transport/sessions-identity/identifying-users-and-agents.md#user-identity) or [user claims](https://ably.com/docs/ai-transport/sessions-identity/identifying-users-and-agents.md#user-claims) to establish a verified identity or role for the user. ### Verify by user identity Use the `clientId` to identify the user who sent a message. This enables personalized responses, per-user rate limiting, or looking up user-specific preferences from your database. When a user [authenticates with Ably](https://ably.com/docs/ai-transport/sessions-identity/identifying-users-and-agents.md#authenticating), embed their identity in the JWT: ```javascript const claims = { 'x-ably-clientId': 'user-123' }; ``` ```python claims = { 'x-ably-clientId': 'user-123' } ``` ```java Map claims = new HashMap<>(); claims.put("x-ably-clientId", "user-123"); ``` The `clientId` is automatically attached to every message the user publishes, so agents can trust this identity. ```javascript await channel.subscribe('user-input', (message) => { const userId = message.clientId; // promptId is a user-generated UUID for correlating responses const { promptId, text } = message.data; console.log(`Received prompt from user ${userId}`); processAndRespond(channel, text, promptId, userId); }); ``` ```python def on_user_input(message): user_id = message.client_id # promptId is a user-generated UUID for correlating responses prompt_id = message.data['promptId'] text = message.data['text'] print(f'Received prompt from user {user_id}') process_and_respond(channel, text, prompt_id, user_id) await channel.subscribe('user-input', on_user_input) ``` ```java channel.subscribe("user-input", message -> { String userId = message.clientId; // promptId is a user-generated UUID for correlating responses JsonObject data = (JsonObject) message.data; String promptId = data.get("promptId").getAsString(); String text = data.get("text").getAsString(); System.out.println("Received prompt from user " + userId); processAndRespond(channel, text, promptId, userId); }); ``` ### Verify by role Use [user claims](https://ably.com/docs/ai-transport/sessions-identity/identifying-users-and-agents.md#user-claims) to verify that a message comes from a user rather than another agent sharing the channel. This is useful when the agent needs to distinguish message sources without needing the specific user identity. When a user [authenticates with Ably](https://ably.com/docs/ai-transport/sessions-identity/identifying-users-and-agents.md#authenticating), embed their role in the JWT: ```javascript const claims = { 'ably.channel.*': 'user' }; ``` ```python claims = { 'ably.channel.*': 'user' } ``` ```java Map claims = new HashMap<>(); claims.put("ably.channel.*", "user"); ``` The user claim is automatically attached to every message the user publishes, so agents can trust this role information. ```javascript await channel.subscribe('user-input', (message) => { const role = message.extras?.userClaim; // promptId is a user-generated UUID for correlating responses const { promptId, text } = message.data; if (role !== 'user') { console.log('Ignoring message from non-user'); return; } processAndRespond(channel, text, promptId); }); ``` ```python def on_user_input(message): role = message.extras.get('userClaim') # promptId is a user-generated UUID for correlating responses prompt_id = message.data['promptId'] text = message.data['text'] if role != 'user': print('Ignoring message from non-user') return process_and_respond(channel, text, prompt_id) await channel.subscribe('user-input', on_user_input) ``` ```java channel.subscribe("user-input", message -> { String role = message.extras.get("userClaim").getAsString(); // promptId is a user-generated UUID for correlating responses JsonObject data = (JsonObject) message.data; String promptId = data.get("promptId").getAsString(); String text = data.get("text").getAsString(); if (!role.equals("user")) { System.out.println("Ignoring message from non-user"); return; } processAndRespond(channel, text, promptId); }); ``` ## Publish user input Users publish messages to the channel to send prompts to the agent. Generate a unique `promptId` for each message to correlate agent responses back to the original prompt. ```javascript const channel = ably.channels.get('your-channel-name'); const promptId = crypto.randomUUID(); await channel.publish('user-input', { promptId: promptId, text: 'What is the weather like today?' }); ``` ```python import uuid channel = ably.channels.get('your-channel-name') prompt_id = str(uuid.uuid4()) message = Message( name='user-input', data={ 'promptId': prompt_id, 'text': 'What is the weather like today?' } ) await channel.publish(message) ``` ```java Channel channel = ably.channels.get("your-channel-name"); String promptId = UUID.randomUUID().toString(); JsonObject data = new JsonObject(); data.addProperty("promptId", promptId); data.addProperty("text", "What is the weather like today?"); channel.publish("user-input", data); ``` ## Subscribe to user input The agent subscribes to a channel to receive messages from users. When a user publishes a message to the channel, the agent receives it through the subscription callback. The following example demonstrates an agent subscribing to receive user input: ```javascript const Ably = require('ably'); const ably = new Ably.Realtime({ key: 'your-api-key' }); const channel = ably.channels.get('your-channel-name'); await channel.subscribe('user-input', (message) => { const { promptId, text } = message.data; const userId = message.clientId; console.log(`Received prompt from ${userId}: ${text}`); // Process the prompt and generate a response processAndRespond(channel, text, promptId); }); ``` ```python from ably import AblyRealtime ably = AblyRealtime(key='your-api-key') channel = ably.channels.get('your-channel-name') def on_user_input(message): prompt_id = message.data['promptId'] text = message.data['text'] user_id = message.client_id print(f'Received prompt from {user_id}: {text}') # Process the prompt and generate a response process_and_respond(channel, text, prompt_id) await channel.subscribe('user-input', on_user_input) ``` ```java import io.ably.lib.realtime.AblyRealtime; import io.ably.lib.realtime.Channel; AblyRealtime ably = new AblyRealtime("your-api-key"); Channel channel = ably.channels.get("your-channel-name"); channel.subscribe("user-input", message -> { JsonObject data = (JsonObject) message.data; String promptId = data.get("promptId").getAsString(); String text = data.get("text").getAsString(); String userId = message.clientId; System.out.println("Received prompt from " + userId + ": " + text); // Process the prompt and generate a response processAndRespond(channel, text, promptId); }); ``` ## Publish agent responses When the agent sends a response, it includes the `promptId` from the original input so users know which prompt the response relates to. This is especially important when users send multiple prompts in quick succession or when responses are streamed. Use the `extras.headers` field to include the `promptId` in agent responses: ```javascript async function processAndRespond(channel, prompt, promptId) { // Generate the response (e.g., call your AI model) const response = await generateAIResponse(prompt); // Publish the response with the promptId for correlation await channel.publish({ name: 'agent-response', data: response, extras: { headers: { promptId: promptId } } }); } ``` ```python async def process_and_respond(channel, prompt, prompt_id): # Generate the response (e.g., call your AI model) response = await generate_ai_response(prompt) message = Message( name='agent-response', data=response, extras={ 'headers': { 'promptId': prompt_id } } ) # Publish the response with the promptId for correlation await channel.publish(message) ``` ```java void processAndRespond(Channel channel, String prompt, String promptId) { // Generate the response (e.g., call your AI model) String response = generateAIResponse(prompt); // Publish the response with the promptId for correlation JsonObject extras = new JsonObject(); JsonObject headers = new JsonObject(); headers.addProperty("promptId", promptId); extras.add("headers", headers); Message message = new Message("agent-response", response, new MessageExtras(extras)); channel.publish(message); } ``` The user's client can then match responses to their original prompts: ```javascript const pendingPrompts = new Map(); // Send a prompt and track it async function sendPrompt(text) { const promptId = crypto.randomUUID(); pendingPrompts.set(promptId, { text }); await channel.publish('user-input', { promptId, text }); return promptId; } // Handle responses await channel.subscribe('agent-response', (message) => { const promptId = message.extras?.headers?.promptId; if (promptId && pendingPrompts.has(promptId)) { const originalPrompt = pendingPrompts.get(promptId); console.log(`Response for "${originalPrompt.text}": ${message.data}`); pendingPrompts.delete(promptId); } }); ``` ```python import uuid pending_prompts = {} # Send a prompt and track it async def send_prompt(text): prompt_id = str(uuid.uuid4()) pending_prompts[prompt_id] = {'text': text} message = Message(name='user-input', data={'promptId': prompt_id, 'text': text}) await channel.publish(message) return prompt_id # Handle responses def on_agent_response(message): prompt_id = message.extras.get('headers', {}).get('promptId') if prompt_id and prompt_id in pending_prompts: original_prompt = pending_prompts[prompt_id] print(f'Response for "{original_prompt["text"]}": {message.data}') del pending_prompts[prompt_id] await channel.subscribe('agent-response', on_agent_response) ``` ```java Map> pendingPrompts = new ConcurrentHashMap<>(); // Send a prompt and track it String sendPrompt(String text) { String promptId = UUID.randomUUID().toString(); Map promptData = new HashMap<>(); promptData.put("text", text); pendingPrompts.put(promptId, promptData); JsonObject data = new JsonObject(); data.addProperty("promptId", promptId); data.addProperty("text", text); channel.publish("user-input", data); return promptId; } // Handle responses channel.subscribe("agent-response", message -> { JsonObject headers = message.extras.asJsonObject().get("headers"); String promptId = headers != null ? headers.get("promptId").getAsString() : null; if (promptId != null && pendingPrompts.containsKey(promptId)) { Map originalPrompt = pendingPrompts.get(promptId); System.out.println("Response for \"" + originalPrompt.get("text") + "\": " + message.data); pendingPrompts.remove(promptId); } }); ``` ## Stream responses For longer AI responses, you'll typically want to stream tokens back to the user rather than waiting for the complete response. The `promptId` correlation allows users to associate streamed tokens with their original prompt. When streaming tokens using [message-per-response](https://ably.com/docs/ai-transport/token-streaming/message-per-response.md) or [message-per-token](https://ably.com/docs/ai-transport/token-streaming/message-per-token.md) patterns, include the `promptId` in the message extras: ```javascript async function streamResponse(channel, prompt, promptId) { // Create initial message for message-per-response pattern const message = await channel.publish({ name: 'agent-response', data: '', extras: { headers: { promptId: promptId } } }); // Stream tokens by appending to the message for await (const token of generateTokens(prompt)) { await channel.appendMessage({ serial: message.serial, data: token, extras: { headers: { promptId: promptId } } }); } } ``` ```python async def stream_response(channel, prompt, prompt_id): # Create initial message for message-per-response pattern message = Message( name='agent-response', data='', extras={ 'headers': { 'promptId': prompt_id } } ) result = await channel.publish(message) message_serial = result.serials[0] # Stream tokens by appending to the message async for token in generate_tokens(prompt): await channel.append_message(Message( serial=message_serial, data=token, extras={ 'headers': { 'promptId': prompt_id } } )) ``` ```java void streamResponse(Channel channel, String prompt, String promptId) throws Exception { // Create initial message for message-per-response pattern JsonObject extras = new JsonObject(); JsonObject headers = new JsonObject(); headers.addProperty("promptId", promptId); extras.add("headers", headers); CompletableFuture publishFuture = new CompletableFuture<>(); channel.publish("agent-response", "", extras, new Callback() { @Override public void onSuccess(PublishResult result) { publishFuture.complete(result); } @Override public void onError(ErrorInfo reason) { publishFuture.completeExceptionally(AblyException.fromErrorInfo(reason)); } }); String messageSerial = publishFuture.get().serials[0]; // Stream tokens by appending to the message for (String token : generateTokens(prompt)) { JsonObject appendExtras = new JsonObject(); JsonObject appendHeaders = new JsonObject(); appendHeaders.addProperty("promptId", promptId); appendExtras.add("headers", appendHeaders); channel.appendMessage(messageSerial, token, appendExtras); } } ``` ## Handle multiple concurrent prompts Users may send multiple prompts before receiving responses, especially during long-running AI operations. The correlation pattern ensures responses are matched to the correct prompts: ```javascript // Agent handling multiple concurrent prompts const activeRequests = new Map(); await channel.subscribe('user-input', async (message) => { const { promptId, text } = message.data; const userId = message.clientId; // Track active request activeRequests.set(promptId, { userId, text, }); try { await streamResponse(channel, text, promptId); } finally { activeRequests.delete(promptId); } }); ``` ```python # Agent handling multiple concurrent prompts active_requests = {} async def on_user_input(message): prompt_id = message.data['promptId'] text = message.data['text'] user_id = message.client_id # Track active request active_requests[prompt_id] = { 'userId': user_id, 'text': text, } try: await stream_response(channel, text, prompt_id) finally: del active_requests[prompt_id] await channel.subscribe('user-input', on_user_input) ``` ```java // Agent handling multiple concurrent prompts Map> activeRequests = new ConcurrentHashMap<>(); channel.subscribe("user-input", message -> { JsonObject data = (JsonObject) message.data; String promptId = data.get("promptId").getAsString(); String text = data.get("text").getAsString(); String userId = message.clientId; // Track active request Map requestData = new HashMap<>(); requestData.put("userId", userId); requestData.put("text", text); activeRequests.put(promptId, requestData); try { streamResponse(channel, text, promptId); } finally { activeRequests.remove(promptId); } }); ```