This guide will get you started with Ably AI Transport using OpenAI's Responses API.
You'll learn how to authenticate users with verified identities, stream tokens from an agent to clients in realtime, and implement human-in-the-loop approval for tool calls. The agent uses OpenAI's GPT model with a send_email tool that requires user approval before execution.
Prerequisites
-
Sign up for an Ably account.
-
Create a new app, and create your first API key in the API Keys tab of the dashboard.
-
Your API key will need the
publish,subscribe, andmessage-update-owncapabilities. -
Enable message appends for the channel:
- Go to the Settings tab of your app in the dashboard.
- Under Rules, click Add new rule.
- Enter
aias the channel namespace. - Check Message annotations, updates, deletes, and appends.
- Click Create channel rule to save.
-
Install any current LTS version of Node.js.
-
Get an OpenAI API key.
Step 1: Project setup
Create a new directory for your project and initialize it:
mkdir ai-agent-demo && cd ai-agent-demo
npm init -y && npm pkg set type=moduleInstall the required dependencies:
npm install ably openai jsonwebtoken express
npm install -D typescript @types/node @types/express @types/jsonwebtokenCreate a TypeScript configuration file:
npx tsc --initCreate a .env file in your project root and add your API keys:
echo "ABLY_API_KEY=demokey:*****" > .env
echo "OPENAI_API_KEY=your_openai_api_key" >> .envStep 2: Authenticate users
Users authenticate with Ably using token authentication. Your server generates signed JWTs that establish a verified identity for each user. Agents can trust this identity because only your server can issue valid tokens.
Create a file called auth-server.ts with an endpoint that generates signed JWTs:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import express from 'express';
import jwt from 'jsonwebtoken';
const app = express();
const apiKey = process.env.ABLY_API_KEY;
if (!apiKey) {
throw new Error('ABLY_API_KEY environment variable is required');
}
const [keyName, keySecret] = apiKey.split(':');
if (!keyName || !keySecret) {
throw new Error('ABLY_API_KEY must be in format "keyName:keySecret"');
}
app.get('/api/auth/token', (req, res) => {
// In production, authenticate the user and get their ID from your session
const userId = 'user-123';
const token = jwt.sign({
'x-ably-clientId': userId,
'ably.channel.*': 'user'
}, keySecret, {
algorithm: 'HS256',
keyid: keyName,
expiresIn: '1h'
});
res.type('application/jwt').send(token);
});
app.listen(3001, () => {
console.log('Auth server running on http://localhost:3001');
});The JWT includes two claims:
x-ably-clientId: Establishes a verified identity that appears on all messages the user publishes.ably.channel.*: Assigns a role that agents can use to distinguish users from other agents on the channel.
Step 3: Create the agent
The agent runs in a trusted server environment and uses API key authentication. It subscribes to a channel to receive user prompts, processes them with OpenAI's Responses API, and streams responses back using the message-per-response pattern. When the model requests a tool call, the agent pauses to request human approval before executing.
Create a file called agent.ts with the setup, tool definition, and human-in-the-loop helpers:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import * as Ably from 'ably';
import OpenAI from 'openai';
const apiKey = process.env.ABLY_API_KEY;
if (!apiKey) {
throw new Error('ABLY_API_KEY environment variable is required');
}
const openai = new OpenAI();
const realtime = new Ably.Realtime({
key: apiKey,
clientId: 'ai-agent',
echoMessages: false,
});
const channel = realtime.channels.get('ai:conversation');
// Define a tool that requires human approval
const tools: OpenAI.Responses.Tool[] = [
{
type: 'function',
name: 'send_email',
description: 'Send an email to a recipient. Always requires human approval.',
parameters: {
type: 'object',
properties: {
to: { type: 'string', description: 'Recipient email address' },
subject: { type: 'string', description: 'Email subject line' },
body: { type: 'string', description: 'Email body content' },
},
required: ['to', 'subject', 'body'],
},
},
];
// Track pending approval requests
const pendingApprovals = new Map<string, (decision: string) => void>();
// Listen for approval responses from users
await channel.subscribe('approval-response', (message: Ably.Message) => {
const toolCallId = message.extras?.headers?.toolCallId;
const resolve = pendingApprovals.get(toolCallId);
if (resolve) {
pendingApprovals.delete(toolCallId);
resolve(message.data.decision);
}
});
// Request human approval for a tool call via the channel
function requestApproval(
toolCallId: string,
toolName: string,
toolInput: Record<string, unknown>,
): Promise<string> {
return new Promise<string>((resolve) => {
pendingApprovals.set(toolCallId, resolve);
channel.publish({
name: 'approval-request',
data: { name: toolName, arguments: toolInput },
extras: { headers: { toolCallId } },
});
console.log(`Awaiting approval for ${toolName} (${toolCallId})`);
});
}
// Execute a tool after approval
function executeTool(name: string, input: Record<string, unknown>) {
if (name === 'send_email') {
console.log(`Sending email to ${input.to}: ${input.subject}`);
return { success: true, message: `Email sent to ${input.to}` };
}
return { error: `Unknown tool: ${name}` };
}The agent publishes approval-request messages to the channel when a tool call is detected, then waits for a matching approval-response correlated by toolCallId. The executeTool function simulates the email action. In production, replace this with actual email delivery logic.
Add the streaming function to agent.ts. This streams OpenAI response tokens to Ably using channel.appendMessage(), while tracking any tool call the model requests:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// Stream OpenAI response tokens to Ably, returning tool call info if any
async function streamToAbly(
input: OpenAI.Responses.ResponseInput,
serial: string,
) {
const stream = await openai.responses.create({
model: 'gpt-4o',
input,
tools,
stream: true,
});
let messageItemId: string | null = null;
let functionCallItem: { id: string; callId: string; name: string } | null = null;
let functionArgs = '';
let hasToolCall = false;
for await (const event of stream) {
switch (event.type) {
case 'response.output_item.added':
if (event.item.type === 'message') {
messageItemId = event.item.id;
} else if (event.item.type === 'function_call') {
functionCallItem = {
id: event.item.id,
callId: event.item.call_id,
name: event.item.name,
};
functionArgs = '';
hasToolCall = true;
}
break;
case 'response.output_text.delta':
if (event.item_id === messageItemId) {
channel.appendMessage({ serial, data: event.delta });
}
break;
case 'response.function_call_arguments.delta':
functionArgs += event.delta;
break;
case 'response.completed':
break;
}
}
return {
hasToolCall,
functionCallItem,
functionArgs,
};
}The function filters for response.output_text.delta events and appends each token to the Ably message. It also tracks function_call output items and accumulates their JSON arguments. The hasToolCall flag indicates whether the model wants to call a tool.
Add the prompt handler to the end of agent.ts. This ties everything together, streaming the initial response and handling tool calls with HITL approval:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// Handle incoming user prompts
await channel.subscribe('user-input', async (message: Ably.Message) => {
const { promptId, text } = message.data as { promptId: string; text: string };
const userId = message.clientId;
const role = message.extras?.userClaim;
console.log(`Received prompt from ${userId} (role: ${role}): ${text}`);
if (role !== 'user') {
console.log('Ignoring message from non-user');
return;
}
// Create the initial Ably message for streaming
const response = await channel.publish({
name: 'agent-response',
data: '',
extras: { headers: { promptId } },
});
const serial = response.serials[0];
if (!serial) {
console.error('No serial returned from publish');
return;
}
// Stream the response from OpenAI
const input: OpenAI.Responses.ResponseInput = [
{ role: 'user', content: text },
];
const { hasToolCall, functionCallItem, functionArgs } = await streamToAbly(input, serial);
// Handle tool call with human-in-the-loop approval
if (hasToolCall && functionCallItem) {
const parsedArgs = JSON.parse(functionArgs);
const decision = await requestApproval(
functionCallItem.callId,
functionCallItem.name,
parsedArgs,
);
let toolResult: Record<string, unknown>;
if (decision === 'approved') {
toolResult = executeTool(functionCallItem.name, parsedArgs);
} else {
toolResult = { error: 'The user rejected this action' };
}
// Continue the conversation with the tool result
const followUpInput: OpenAI.Responses.ResponseInput = [
{ role: 'user', content: text },
{
type: 'function_call',
id: functionCallItem.id,
call_id: functionCallItem.callId,
name: functionCallItem.name,
arguments: functionArgs,
},
{
type: 'function_call_output',
call_id: functionCallItem.callId,
output: JSON.stringify(toolResult),
},
];
// Stream the follow-up response, appending to the same message
channel.appendMessage({ serial, data: '\n\n' });
await streamToAbly(followUpInput, serial);
}
// Signal completion
await channel.publish({
name: 'agent-response-complete',
extras: { headers: { promptId } },
});
console.log(`Completed response for prompt ${promptId}`);
});
console.log('Agent is listening for prompts...');The prompt handler:
- Verifies the sender has the
userrole. - Creates an initial Ably message and captures its
serialfor appending. - Streams the OpenAI response, appending text tokens in realtime.
- If the model requests a tool call, publishes an
approval-requestand waits for the user's decision. - After approval, executes the tool and streams a follow-up response appended to the same message.
Step 4: Create the client
The client uses an authCallback to obtain a signed JWT from your auth server. The clientId from the token is automatically attached to all messages the client publishes.
Create a file called client.ts with the connection setup and token streaming subscription:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import * as Ably from 'ably';
import crypto from 'crypto';
import * as readline from 'readline';
const realtime = new Ably.Realtime({
authCallback: async (
_tokenParams: Ably.TokenParams,
callback: (error: Ably.ErrorInfo | string | null, token: Ably.TokenDetails | Ably.TokenRequest | string | null) => void
) => {
try {
const response = await fetch('http://localhost:3001/api/auth/token');
const token = await response.text();
callback(null, token);
} catch (error) {
callback(error instanceof Error ? error.message : String(error), null);
}
}
});
realtime.connection.on('connected', () => {
console.log('Connected to Ably as', realtime.auth.clientId);
});
const channel = realtime.channels.get('ai:conversation');
const pendingPrompts = new Map<string, () => void>();
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
// Subscribe to streamed agent responses
await channel.subscribe('agent-response', (message: Ably.Message) => {
const promptId = message.extras?.headers?.promptId;
if (!promptId) return;
switch (message.action) {
case 'message.create':
break;
case 'message.append':
// Write each new token as it arrives
process.stdout.write(message.data || '');
break;
case 'message.update':
// Full response after reconnection
console.log(message.data || '');
break;
}
});The client subscribes to agent-response messages and handles different message actions:
message.create: A new response has started.message.append: A token has been appended. Each token is written directly to the terminal as it arrives.message.update: The full response content, received after reconnection.
Add the human-in-the-loop approval handler to client.ts. When the agent requests approval for a tool call, the client displays the details and prompts the user:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Subscribe to approval requests for human-in-the-loop
await channel.subscribe('approval-request', async (message: Ably.Message) => {
const { name, arguments: args } = message.data;
const toolCallId = message.extras?.headers?.toolCallId;
console.log(`\n\nAgent wants to execute: ${name}`);
console.log(`Arguments: ${JSON.stringify(args, null, 2)}`);
const answer = await new Promise<string>((resolve) => {
rl.question('Approve? (yes/no): ', resolve);
});
const decision = answer.toLowerCase() === 'yes' ? 'approved' : 'rejected';
await channel.publish({
name: 'approval-response',
data: { decision },
extras: { headers: { toolCallId } },
});
console.log(`Decision sent: ${decision}\n`);
});Step 5: Send user prompts
Each prompt includes a unique promptId to correlate responses. The user's clientId is automatically attached to the message by Ably.
Add the following to the end of client.ts:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Subscribe to completion signals
await channel.subscribe('agent-response-complete', (message: Ably.Message) => {
const promptId = message.extras?.headers?.promptId;
if (!promptId) return;
console.log('\n');
const resolve = pendingPrompts.get(promptId);
if (resolve) {
pendingPrompts.delete(promptId);
resolve();
}
});
async function sendPrompt(text: string): Promise<void> {
const promptId = crypto.randomUUID();
const completionPromise = new Promise<void>((resolve) => {
pendingPrompts.set(promptId, resolve);
});
await channel.publish('user-input', {
promptId,
text,
});
await completionPromise;
}
function askQuestion() {
rl.question('Enter a prompt (or "quit" to exit): ', async (text) => {
if (text.toLowerCase() === 'quit') {
rl.close();
realtime.close();
return;
}
await sendPrompt(text);
askQuestion();
});
}
askQuestion();Step 6: Run the example
Open three terminal windows to run the auth server, agent, and client.
Terminal 1: Start the auth server
npx tsx --env-file=.env auth-server.tsYou should see:
Auth server running on http://localhost:3001Terminal 2: Start the agent
npx tsx --env-file=.env agent.tsYou should see:
Agent is listening for prompts...Terminal 3: Run the client
npx tsx --env-file=.env client.tsTry entering different prompts. For a regular response without tool calls:
Enter a prompt (or "quit" to exit): What is the capital of France?
The capital of France is Paris.
Enter a prompt (or "quit" to exit):For a response that triggers a tool call with human-in-the-loop approval:
Enter a prompt (or "quit" to exit): Send an email to [email protected] saying hello
Agent wants to execute: send_email
Arguments: {
"to": "[email protected]",
"subject": "Hello",
"body": "Hello Alice!"
}
Approve? (yes/no): yes
Decision sent: approved
I've sent the email to [email protected] with the subject "Hello".
Enter a prompt (or "quit" to exit):Next steps
Continue exploring AI Transport features:
- Learn about token streaming patterns including message-per-response and message-per-token.
- Understand user input patterns for handling prompts and correlating responses.
- Explore identifying users and agents for more advanced authentication scenarios.
- Implement more advanced human-in-the-loop workflows with role-based authorization.
- Stream tool call information to build generative UI experiences.