Getting started with LangGraph

Open in

This guide will get you started with Ably AI Transport using LangGraph.

You'll learn how to authenticate users with verified identities, stream tokens from an agent to clients in realtime, and implement human-in-the-loop approval for tool calls. The agent uses LangGraph with a send_email tool that requires user approval before execution.

Prerequisites

  1. Sign up for an Ably account.

  2. Create a new app, and create your first API key in the API Keys tab of the dashboard.

  3. Your API key will need the publish, subscribe, and message-update-own capabilities.

  4. Enable message appends for the channel:

    1. Go to the Settings tab of your app in the dashboard.
    2. Under Rules, click Add new rule.
    3. Enter ai as the channel namespace.
    4. Check Message annotations, updates, deletes, and appends.
    5. Click Create channel rule to save.
  5. Install any current LTS version of Node.js.

  6. Get an Anthropic API key.

Step 1: Project setup

Create a new directory for your project and initialize it:

mkdir ai-agent-demo && cd ai-agent-demo
npm init -y && npm pkg set type=module

Install the required dependencies:

npm install ably @langchain/langgraph@^0.2 @langchain/anthropic@^0.3 @langchain/core@^0.3 zod jsonwebtoken express
npm install -D typescript @types/node @types/express @types/jsonwebtoken

Create a TypeScript configuration file:

npx tsc --init

Create a .env file in your project root and add your API keys:

echo "ABLY_API_KEY=demokey:*****" > .env
echo "ANTHROPIC_API_KEY=your_anthropic_api_key" >> .env

Step 2: Authenticate users

Users authenticate with Ably using token authentication. Your server generates signed JWTs that establish a verified identity for each user. Agents can trust this identity because only your server can issue valid tokens.

Create a file called auth-server.ts with an endpoint that generates signed JWTs:

TypeScript

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

import express from 'express';
import jwt from 'jsonwebtoken';

const app = express();

const apiKey = process.env.ABLY_API_KEY;
if (!apiKey) {
  throw new Error('ABLY_API_KEY environment variable is required');
}

const [keyName, keySecret] = apiKey.split(':');
if (!keyName || !keySecret) {
  throw new Error('ABLY_API_KEY must be in format "keyName:keySecret"');
}

app.get('/api/auth/token', (req, res) => {
  // In production, authenticate the user and get their ID from your session
  const userId = 'user-123';

  const token = jwt.sign({
    'x-ably-clientId': userId,
    'ably.channel.*': 'user'
  }, keySecret, {
    algorithm: 'HS256',
    keyid: keyName,
    expiresIn: '1h'
  });

  res.type('application/jwt').send(token);
});

app.listen(3001, () => {
  console.log('Auth server running on http://localhost:3001');
});

The JWT includes two claims:

  • x-ably-clientId: Establishes a verified identity that appears on all messages the user publishes.
  • ably.channel.*: Assigns a role that agents can use to distinguish users from other agents on the channel.

Step 3: Create the agent

The agent runs in a trusted server environment and uses API key authentication. It subscribes to a channel to receive user prompts, processes them with a LangGraph graph that uses Claude, and streams responses back using the message-per-response pattern. When the model requests a tool call, the agent pauses to request human approval before executing.

Create a file called agent.ts with the setup, tool definition, and human-in-the-loop helpers:

TypeScript

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

import * as Ably from 'ably';
import { ChatAnthropic } from '@langchain/anthropic';
import { tool } from '@langchain/core/tools';
import { StateGraph, Annotation, START, END } from '@langchain/langgraph';
import { AIMessage } from '@langchain/core/messages';
import { z } from 'zod';

const apiKey = process.env.ABLY_API_KEY;
if (!apiKey) {
  throw new Error('ABLY_API_KEY environment variable is required');
}

const realtime = new Ably.Realtime({
  key: apiKey,
  clientId: 'ai-agent',
  echoMessages: false,
});

const channel = realtime.channels.get('ai:conversation');

// Define a tool that requires human approval
const sendEmail = tool(
  async ({ to, subject, body }) => {
    console.log(`Sending email to ${to}: ${subject}`);
    return JSON.stringify({ success: true, message: `Email sent to ${to}` });
  },
  {
    name: 'send_email',
    description: 'Send an email to a recipient. Always requires human approval.',
    schema: z.object({
      to: z.string().describe('Recipient email address'),
      subject: z.string().describe('Email subject line'),
      body: z.string().describe('Email body content'),
    }),
  },
);

// Initialize the model with tools
const model = new ChatAnthropic({ model: 'claude-sonnet-4-5' });
const modelWithTools = model.bindTools([sendEmail]);

// Track pending approval requests
const pendingApprovals = new Map<string, (decision: string) => void>();

// Listen for approval responses from users
await channel.subscribe('approval-response', (message: Ably.Message) => {
  const toolCallId = message.extras?.headers?.toolCallId;
  const resolve = pendingApprovals.get(toolCallId);
  if (resolve) {
    pendingApprovals.delete(toolCallId);
    resolve(message.data.decision);
  }
});

// Request human approval for a tool call via the channel
function requestApproval(
  toolCallId: string,
  toolName: string,
  toolInput: Record<string, unknown>,
): Promise<string> {
  return new Promise<string>((resolve) => {
    pendingApprovals.set(toolCallId, resolve);
    channel.publish({
      name: 'approval-request',
      data: { name: toolName, arguments: toolInput },
      extras: { headers: { toolCallId } },
    });
    console.log(`Awaiting approval for ${toolName} (${toolCallId})`);
  });
}

The agent publishes approval-request messages to the channel when a tool call is detected, then waits for a matching approval-response correlated by toolCallId. The sendEmail tool simulates the email action. In production, replace this with actual email delivery logic.

Add the streaming function to agent.ts. This streams LangGraph response tokens to Ably using channel.appendMessage(), while tracking any tool call the model requests:

TypeScript

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

// Stream LangGraph response tokens to Ably
async function streamToAbly(
  messages: any[],
  serial: string,
  useTools: boolean,
) {
  const StateAnnotation = Annotation.Root({
    messages: Annotation<any[]>({
      reducer: (x, y) => x.concat(y),
      default: () => [],
    }),
  });

  const graph = new StateGraph(StateAnnotation)
    .addNode('agent', async (state) => {
      const response = await (useTools ? modelWithTools : model).invoke(state.messages);
      return { messages: [response] };
    })
    .addEdge(START, 'agent')
    .addEdge('agent', END);

  const app = graph.compile();

  const stream = await app.stream(
    { messages },
    { streamMode: 'messages' },
  );

  let toolCallId: string | undefined;
  let toolCallName: string | undefined;
  let toolCallArgsStr = '';

  for await (const [messageChunk, metadata] of stream) {
    const content = messageChunk?.content;

    // Stream text tokens to Ably
    // Content may be a string or an array of content blocks (Anthropic format)
    if (typeof content === 'string' && content) {
      channel.appendMessage({ serial, data: content });
    } else if (Array.isArray(content)) {
      for (const block of content) {
        if (block.type === 'text' && block.text) {
          channel.appendMessage({ serial, data: block.text });
        }
      }
    }

    // Accumulate tool call info from streaming chunks
    for (const chunk of messageChunk?.tool_call_chunks ?? []) {
      if (chunk.id) toolCallId = chunk.id;
      if (chunk.name) toolCallName = chunk.name;
      if (chunk.args) toolCallArgsStr += chunk.args;
    }
  }

  const toolCallDetected = toolCallId && toolCallName
    ? { id: toolCallId, name: toolCallName, args: toolCallArgsStr ? JSON.parse(toolCallArgsStr) : {} }
    : null;

  return { hasToolCall: !!toolCallDetected, toolCallInfo: toolCallDetected };
}

The function creates a LangGraph state graph and streams message chunks. It appends text content to the Ably message, handling both string and array content blocks (Anthropic streams content as arrays of content blocks). Tool call arguments arrive incrementally via tool_call_chunks and are accumulated as a string, then parsed as JSON after the stream completes.

Add the prompt handler to the end of agent.ts. This ties everything together, streaming the initial response and handling tool calls with HITL approval:

TypeScript

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

// Handle incoming user prompts
await channel.subscribe('user-input', async (message: Ably.Message) => {
  const { promptId, text } = message.data as { promptId: string; text: string };
  const userId = message.clientId;
  const role = message.extras?.userClaim;

  console.log(`Received prompt from ${userId} (role: ${role}): ${text}`);

  if (role !== 'user') {
    console.log('Ignoring message from non-user');
    return;
  }

  // Create the initial Ably message for streaming
  const response = await channel.publish({
    name: 'agent-response',
    data: '',
    extras: { headers: { promptId } },
  });

  const serial = response.serials[0];
  if (!serial) {
    console.error('No serial returned from publish');
    return;
  }

  // Stream the response from LangGraph
  const messages = [{ role: 'user', content: text }];
  const { hasToolCall, toolCallInfo } = await streamToAbly(messages, serial, true);

  // Handle tool call with human-in-the-loop approval
  if (hasToolCall && toolCallInfo) {
    const decision = await requestApproval(
      toolCallInfo.id,
      toolCallInfo.name,
      toolCallInfo.args,
    );

    let toolResult: string;
    if (decision === 'approved') {
      const result = await sendEmail.invoke(toolCallInfo.args);
      toolResult = result;
    } else {
      toolResult = JSON.stringify({ error: 'The user rejected this action' });
    }

    // Stream follow-up response with the tool result
    channel.appendMessage({ serial, data: '\n\n' });

    const followUpMessages = [
      { role: 'user', content: text },
      new AIMessage({
        content: '',
        tool_calls: [{
          id: toolCallInfo.id,
          name: toolCallInfo.name,
          args: toolCallInfo.args,
        }],
      }),
      { role: 'tool', content: toolResult, tool_call_id: toolCallInfo.id },
    ];

    await streamToAbly(followUpMessages, serial, false);
  }

  // Signal completion
  await channel.publish({
    name: 'agent-response-complete',
    extras: { headers: { promptId } },
  });

  console.log(`Completed response for prompt ${promptId}`);
});

console.log('Agent is listening for prompts...');

The prompt handler:

  1. Verifies the sender has the user role.
  2. Creates an initial Ably message and captures its serial for appending.
  3. Streams the LangGraph response, appending text tokens in realtime.
  4. If the model requests a tool call, publishes an approval-request and waits for the user's decision.
  5. After approval, executes the tool and streams a follow-up response appended to the same message.

Step 4: Create the client

The client uses an authCallback to obtain a signed JWT from your auth server. The clientId from the token is automatically attached to all messages the client publishes.

Create a file called client.ts with the connection setup and token streaming subscription:

TypeScript

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

import * as Ably from 'ably';
import crypto from 'crypto';
import * as readline from 'readline';

const realtime = new Ably.Realtime({
  authCallback: async (
    _tokenParams: Ably.TokenParams,
    callback: (error: Ably.ErrorInfo | string | null, token: Ably.TokenDetails | Ably.TokenRequest | string | null) => void
  ) => {
    try {
      const response = await fetch('http://localhost:3001/api/auth/token');
      const token = await response.text();
      callback(null, token);
    } catch (error) {
      callback(error instanceof Error ? error.message : String(error), null);
    }
  }
});

realtime.connection.on('connected', () => {
  console.log('Connected to Ably as', realtime.auth.clientId);
});

const channel = realtime.channels.get('ai:conversation');
const pendingPrompts = new Map<string, () => void>();

const rl = readline.createInterface({
  input: process.stdin,
  output: process.stdout,
});

// Subscribe to streamed agent responses
await channel.subscribe('agent-response', (message: Ably.Message) => {
  switch (message.action) {
    case 'message.create':
      break;
    case 'message.append':
      // Write each new token as it arrives
      process.stdout.write(message.data || '');
      break;
    case 'message.update':
      // Full response after reconnection
      console.log(message.data || '');
      break;
  }
});

The client subscribes to agent-response messages and handles different message actions:

  • message.create: A new response has started.
  • message.append: A token has been appended. Each token is written directly to the terminal as it arrives.
  • message.update: The full response content, received after reconnection.

Add the human-in-the-loop approval handler to client.ts. When the agent requests approval for a tool call, the client displays the details and prompts the user:

TypeScript

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

// Subscribe to approval requests for human-in-the-loop
await channel.subscribe('approval-request', async (message: Ably.Message) => {
  const { name, arguments: args } = message.data;
  const toolCallId = message.extras?.headers?.toolCallId;

  console.log(`\n\nAgent wants to execute: ${name}`);
  console.log(`Arguments: ${JSON.stringify(args, null, 2)}`);

  const answer = await new Promise<string>((resolve) => {
    rl.question('Approve? (yes/no): ', resolve);
  });

  const decision = answer.toLowerCase() === 'yes' ? 'approved' : 'rejected';

  await channel.publish({
    name: 'approval-response',
    data: { decision },
    extras: { headers: { toolCallId } },
  });

  console.log(`Decision sent: ${decision}\n`);
});

Step 5: Send user prompts

Each prompt includes a unique promptId to correlate responses. The user's clientId is automatically attached to the message by Ably.

Add the following to the end of client.ts:

TypeScript

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

// Subscribe to completion signals
await channel.subscribe('agent-response-complete', (message: Ably.Message) => {
  const promptId = message.extras?.headers?.promptId;
  if (!promptId) return;

  console.log('\n');
  const resolve = pendingPrompts.get(promptId);
  if (resolve) {
    pendingPrompts.delete(promptId);
    resolve();
  }
});

async function sendPrompt(text: string): Promise<void> {
  const promptId = crypto.randomUUID();

  const completionPromise = new Promise<void>((resolve) => {
    pendingPrompts.set(promptId, resolve);
  });

  await channel.publish('user-input', {
    promptId,
    text,
  });

  await completionPromise;
}

function askQuestion() {
  rl.question('Enter a prompt (or "quit" to exit): ', async (text) => {
    if (text.toLowerCase() === 'quit') {
      rl.close();
      realtime.close();
      return;
    }

    await sendPrompt(text);
    askQuestion();
  });
}

askQuestion();

Step 6: Run the example

Open three terminal windows to run the auth server, agent, and client.

Terminal 1: Start the auth server

npx tsx --env-file=.env auth-server.ts

You should see:

Auth server running on http://localhost:3001

Terminal 2: Start the agent

npx tsx --env-file=.env agent.ts

You should see:

Agent is listening for prompts...

Terminal 3: Run the client

npx tsx --env-file=.env client.ts

Try entering different prompts. For a regular response without tool calls:

Enter a prompt (or "quit" to exit): What is the capital of France?

The capital of France is Paris.

Enter a prompt (or "quit" to exit):

For a response that triggers a tool call with human-in-the-loop approval:

Enter a prompt (or "quit" to exit): Send an email to [email protected] saying hello

Agent wants to execute: send_email
Arguments: {
  "to": "[email protected]",
  "subject": "Hello",
  "body": "Hello Alice!"
}
Approve? (yes/no): yes
Decision sent: approved

I've sent the email to [email protected] with the subject "Hello".

Enter a prompt (or "quit" to exit):

Next steps

Continue exploring AI Transport features: