Getting started with OpenAI

Open in

This guide will get you started with Ably AI Transport using OpenAI's Responses API.

You'll learn how to authenticate users with verified identities, stream tokens from an agent to clients in realtime, and implement human-in-the-loop approval for tool calls. The agent uses OpenAI's GPT model with a send_email tool that requires user approval before execution.

Prerequisites

  1. Sign up for an Ably account.

  2. Create a new app, and create your first API key in the API Keys tab of the dashboard.

  3. Your API key will need the publish, subscribe, and message-update-own capabilities.

  4. Enable message appends for the channel:

    1. Go to the Settings tab of your app in the dashboard.
    2. Under Rules, click Add new rule.
    3. Enter ai as the channel namespace.
    4. Check Message annotations, updates, deletes, and appends.
    5. Click Create channel rule to save.
  5. Install any current LTS version of Node.js.

  6. Get an OpenAI API key.

Step 1: Project setup

Create a new directory for your project and initialize it:

mkdir ai-agent-demo && cd ai-agent-demo
npm init -y && npm pkg set type=module

Install the required dependencies:

npm install ably openai jsonwebtoken express
npm install -D typescript @types/node @types/express @types/jsonwebtoken

Create a TypeScript configuration file:

npx tsc --init

Create a .env file in your project root and add your API keys:

echo "ABLY_API_KEY=demokey:*****" > .env
echo "OPENAI_API_KEY=your_openai_api_key" >> .env

Step 2: Authenticate users

Users authenticate with Ably using token authentication. Your server generates signed JWTs that establish a verified identity for each user. Agents can trust this identity because only your server can issue valid tokens.

Create a file called auth-server.ts with an endpoint that generates signed JWTs:

TypeScript

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

import express from 'express';
import jwt from 'jsonwebtoken';

const app = express();

const apiKey = process.env.ABLY_API_KEY;
if (!apiKey) {
  throw new Error('ABLY_API_KEY environment variable is required');
}

const [keyName, keySecret] = apiKey.split(':');
if (!keyName || !keySecret) {
  throw new Error('ABLY_API_KEY must be in format "keyName:keySecret"');
}

app.get('/api/auth/token', (req, res) => {
  // In production, authenticate the user and get their ID from your session
  const userId = 'user-123';

  const token = jwt.sign({
    'x-ably-clientId': userId,
    'ably.channel.*': 'user'
  }, keySecret, {
    algorithm: 'HS256',
    keyid: keyName,
    expiresIn: '1h'
  });

  res.type('application/jwt').send(token);
});

app.listen(3001, () => {
  console.log('Auth server running on http://localhost:3001');
});

The JWT includes two claims:

  • x-ably-clientId: Establishes a verified identity that appears on all messages the user publishes.
  • ably.channel.*: Assigns a role that agents can use to distinguish users from other agents on the channel.

Step 3: Create the agent

The agent runs in a trusted server environment and uses API key authentication. It subscribes to a channel to receive user prompts, processes them with OpenAI's Responses API, and streams responses back using the message-per-response pattern. When the model requests a tool call, the agent pauses to request human approval before executing.

Create a file called agent.ts with the setup, tool definition, and human-in-the-loop helpers:

TypeScript

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

import * as Ably from 'ably';
import OpenAI from 'openai';

const apiKey = process.env.ABLY_API_KEY;
if (!apiKey) {
  throw new Error('ABLY_API_KEY environment variable is required');
}

const openai = new OpenAI();

const realtime = new Ably.Realtime({
  key: apiKey,
  clientId: 'ai-agent',
  echoMessages: false,
});

const channel = realtime.channels.get('ai:conversation');

// Define a tool that requires human approval
const tools: OpenAI.Responses.Tool[] = [
  {
    type: 'function',
    name: 'send_email',
    description: 'Send an email to a recipient. Always requires human approval.',
    parameters: {
      type: 'object',
      properties: {
        to: { type: 'string', description: 'Recipient email address' },
        subject: { type: 'string', description: 'Email subject line' },
        body: { type: 'string', description: 'Email body content' },
      },
      required: ['to', 'subject', 'body'],
    },
  },
];

// Track pending approval requests
const pendingApprovals = new Map<string, (decision: string) => void>();

// Listen for approval responses from users
await channel.subscribe('approval-response', (message: Ably.Message) => {
  const toolCallId = message.extras?.headers?.toolCallId;
  const resolve = pendingApprovals.get(toolCallId);
  if (resolve) {
    pendingApprovals.delete(toolCallId);
    resolve(message.data.decision);
  }
});

// Request human approval for a tool call via the channel
function requestApproval(
  toolCallId: string,
  toolName: string,
  toolInput: Record<string, unknown>,
): Promise<string> {
  return new Promise<string>((resolve) => {
    pendingApprovals.set(toolCallId, resolve);
    channel.publish({
      name: 'approval-request',
      data: { name: toolName, arguments: toolInput },
      extras: { headers: { toolCallId } },
    });
    console.log(`Awaiting approval for ${toolName} (${toolCallId})`);
  });
}

// Execute a tool after approval
function executeTool(name: string, input: Record<string, unknown>) {
  if (name === 'send_email') {
    console.log(`Sending email to ${input.to}: ${input.subject}`);
    return { success: true, message: `Email sent to ${input.to}` };
  }
  return { error: `Unknown tool: ${name}` };
}

The agent publishes approval-request messages to the channel when a tool call is detected, then waits for a matching approval-response correlated by toolCallId. The executeTool function simulates the email action. In production, replace this with actual email delivery logic.

Add the streaming function to agent.ts. This streams OpenAI response tokens to Ably using channel.appendMessage(), while tracking any tool call the model requests:

TypeScript

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

// Stream OpenAI response tokens to Ably, returning tool call info if any
async function streamToAbly(
  input: OpenAI.Responses.ResponseInput,
  serial: string,
) {
  const stream = await openai.responses.create({
    model: 'gpt-4o',
    input,
    tools,
    stream: true,
  });

  let messageItemId: string | null = null;
  let functionCallItem: { id: string; callId: string; name: string } | null = null;
  let functionArgs = '';
  let hasToolCall = false;

  for await (const event of stream) {
    switch (event.type) {
      case 'response.output_item.added':
        if (event.item.type === 'message') {
          messageItemId = event.item.id;
        } else if (event.item.type === 'function_call') {
          functionCallItem = {
            id: event.item.id,
            callId: event.item.call_id,
            name: event.item.name,
          };
          functionArgs = '';
          hasToolCall = true;
        }
        break;

      case 'response.output_text.delta':
        if (event.item_id === messageItemId) {
          channel.appendMessage({ serial, data: event.delta });
        }
        break;

      case 'response.function_call_arguments.delta':
        functionArgs += event.delta;
        break;

      case 'response.completed':
        break;
    }
  }

  return {
    hasToolCall,
    functionCallItem,
    functionArgs,
  };
}

The function filters for response.output_text.delta events and appends each token to the Ably message. It also tracks function_call output items and accumulates their JSON arguments. The hasToolCall flag indicates whether the model wants to call a tool.

Add the prompt handler to the end of agent.ts. This ties everything together, streaming the initial response and handling tool calls with HITL approval:

TypeScript

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

// Handle incoming user prompts
await channel.subscribe('user-input', async (message: Ably.Message) => {
  const { promptId, text } = message.data as { promptId: string; text: string };
  const userId = message.clientId;
  const role = message.extras?.userClaim;

  console.log(`Received prompt from ${userId} (role: ${role}): ${text}`);

  if (role !== 'user') {
    console.log('Ignoring message from non-user');
    return;
  }

  // Create the initial Ably message for streaming
  const response = await channel.publish({
    name: 'agent-response',
    data: '',
    extras: { headers: { promptId } },
  });

  const serial = response.serials[0];
  if (!serial) {
    console.error('No serial returned from publish');
    return;
  }

  // Stream the response from OpenAI
  const input: OpenAI.Responses.ResponseInput = [
    { role: 'user', content: text },
  ];

  const { hasToolCall, functionCallItem, functionArgs } = await streamToAbly(input, serial);

  // Handle tool call with human-in-the-loop approval
  if (hasToolCall && functionCallItem) {
    const parsedArgs = JSON.parse(functionArgs);

    const decision = await requestApproval(
      functionCallItem.callId,
      functionCallItem.name,
      parsedArgs,
    );

    let toolResult: Record<string, unknown>;
    if (decision === 'approved') {
      toolResult = executeTool(functionCallItem.name, parsedArgs);
    } else {
      toolResult = { error: 'The user rejected this action' };
    }

    // Continue the conversation with the tool result
    const followUpInput: OpenAI.Responses.ResponseInput = [
      { role: 'user', content: text },
      {
        type: 'function_call',
        id: functionCallItem.id,
        call_id: functionCallItem.callId,
        name: functionCallItem.name,
        arguments: functionArgs,
      },
      {
        type: 'function_call_output',
        call_id: functionCallItem.callId,
        output: JSON.stringify(toolResult),
      },
    ];

    // Stream the follow-up response, appending to the same message
    channel.appendMessage({ serial, data: '\n\n' });
    await streamToAbly(followUpInput, serial);
  }

  // Signal completion
  await channel.publish({
    name: 'agent-response-complete',
    extras: { headers: { promptId } },
  });

  console.log(`Completed response for prompt ${promptId}`);
});

console.log('Agent is listening for prompts...');

The prompt handler:

  1. Verifies the sender has the user role.
  2. Creates an initial Ably message and captures its serial for appending.
  3. Streams the OpenAI response, appending text tokens in realtime.
  4. If the model requests a tool call, publishes an approval-request and waits for the user's decision.
  5. After approval, executes the tool and streams a follow-up response appended to the same message.

Step 4: Create the client

The client uses an authCallback to obtain a signed JWT from your auth server. The clientId from the token is automatically attached to all messages the client publishes.

Create a file called client.ts with the connection setup and token streaming subscription:

TypeScript

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

import * as Ably from 'ably';
import crypto from 'crypto';
import * as readline from 'readline';

const realtime = new Ably.Realtime({
  authCallback: async (
    _tokenParams: Ably.TokenParams,
    callback: (error: Ably.ErrorInfo | string | null, token: Ably.TokenDetails | Ably.TokenRequest | string | null) => void
  ) => {
    try {
      const response = await fetch('http://localhost:3001/api/auth/token');
      const token = await response.text();
      callback(null, token);
    } catch (error) {
      callback(error instanceof Error ? error.message : String(error), null);
    }
  }
});

realtime.connection.on('connected', () => {
  console.log('Connected to Ably as', realtime.auth.clientId);
});

const channel = realtime.channels.get('ai:conversation');
const pendingPrompts = new Map<string, () => void>();

const rl = readline.createInterface({
  input: process.stdin,
  output: process.stdout,
});

// Subscribe to streamed agent responses
await channel.subscribe('agent-response', (message: Ably.Message) => {
  const promptId = message.extras?.headers?.promptId;
  if (!promptId) return;

  switch (message.action) {
    case 'message.create':
      break;
    case 'message.append':
      // Write each new token as it arrives
      process.stdout.write(message.data || '');
      break;
    case 'message.update':
      // Full response after reconnection
      console.log(message.data || '');
      break;
  }
});

The client subscribes to agent-response messages and handles different message actions:

  • message.create: A new response has started.
  • message.append: A token has been appended. Each token is written directly to the terminal as it arrives.
  • message.update: The full response content, received after reconnection.

Add the human-in-the-loop approval handler to client.ts. When the agent requests approval for a tool call, the client displays the details and prompts the user:

TypeScript

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

// Subscribe to approval requests for human-in-the-loop
await channel.subscribe('approval-request', async (message: Ably.Message) => {
  const { name, arguments: args } = message.data;
  const toolCallId = message.extras?.headers?.toolCallId;

  console.log(`\n\nAgent wants to execute: ${name}`);
  console.log(`Arguments: ${JSON.stringify(args, null, 2)}`);

  const answer = await new Promise<string>((resolve) => {
    rl.question('Approve? (yes/no): ', resolve);
  });

  const decision = answer.toLowerCase() === 'yes' ? 'approved' : 'rejected';

  await channel.publish({
    name: 'approval-response',
    data: { decision },
    extras: { headers: { toolCallId } },
  });

  console.log(`Decision sent: ${decision}\n`);
});

Step 5: Send user prompts

Each prompt includes a unique promptId to correlate responses. The user's clientId is automatically attached to the message by Ably.

Add the following to the end of client.ts:

TypeScript

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

// Subscribe to completion signals
await channel.subscribe('agent-response-complete', (message: Ably.Message) => {
  const promptId = message.extras?.headers?.promptId;
  if (!promptId) return;

  console.log('\n');
  const resolve = pendingPrompts.get(promptId);
  if (resolve) {
    pendingPrompts.delete(promptId);
    resolve();
  }
});

async function sendPrompt(text: string): Promise<void> {
  const promptId = crypto.randomUUID();

  const completionPromise = new Promise<void>((resolve) => {
    pendingPrompts.set(promptId, resolve);
  });

  await channel.publish('user-input', {
    promptId,
    text,
  });

  await completionPromise;
}

function askQuestion() {
  rl.question('Enter a prompt (or "quit" to exit): ', async (text) => {
    if (text.toLowerCase() === 'quit') {
      rl.close();
      realtime.close();
      return;
    }

    await sendPrompt(text);
    askQuestion();
  });
}

askQuestion();

Step 6: Run the example

Open three terminal windows to run the auth server, agent, and client.

Terminal 1: Start the auth server

npx tsx --env-file=.env auth-server.ts

You should see:

Auth server running on http://localhost:3001

Terminal 2: Start the agent

npx tsx --env-file=.env agent.ts

You should see:

Agent is listening for prompts...

Terminal 3: Run the client

npx tsx --env-file=.env client.ts

Try entering different prompts. For a regular response without tool calls:

Enter a prompt (or "quit" to exit): What is the capital of France?

The capital of France is Paris.

Enter a prompt (or "quit" to exit):

For a response that triggers a tool call with human-in-the-loop approval:

Enter a prompt (or "quit" to exit): Send an email to [email protected] saying hello

Agent wants to execute: send_email
Arguments: {
  "to": "[email protected]",
  "subject": "Hello",
  "body": "Hello Alice!"
}
Approve? (yes/no): yes
Decision sent: approved

I've sent the email to [email protected] with the subject "Hello".

Enter a prompt (or "quit" to exit):

Next steps

Continue exploring AI Transport features: