Using Reactor Firehose with Amazon SQS

Reactor Firehose enables Enterprise customers to send messages published by the Ably platform directly to a third party streaming or queuing service so that they can be processed asynchronously. For example, you might want to persist messages to a database or use them to trigger some sort of workflow. But you want to achieve this without adding extra overhead to your servers or delaying the delivery of messages to subscribing clients.

In this tutorial you will learn how to use the Reactor Firehose integration with Amazon SQS. You will publish messages representing orders to a channel and create an integration that adds these messages to an SQS queue.

You will then write a serverless function using AWS Lambda that reads each order from the queue and simulates an attempt to authorize the credit card details provided. It then sends the results as an Ably message to another channel so that a subscriber can display them. This process is illustrated in the following diagram:


Reactor Firehose with SQS tutorial overview

The solution code for this project is available on Github.

Prerequisites

To use Reactor Firehose, you need:

To use Amazon SQS, you must:

  • Create an AWS account, or use an existing one
  • Create an Administrator IAM user
  • Get your access key ID and secret access key

You can achieve these tasks by performing steps 1-3 in the Setting up Amazon SQS guide.

This tutorial uses Node.js, which you can download here.

Step 1: Create your queue

  1. Visit the Amazon SQS console.
  2. Click the Create queue button. The create queue page displays.
  3. In the Details section, select the Standard queue type and name your queue OrdersQueue. Amazon SQS also offers a FIFO (first in, first out) queue type that guarantees message ordering and enables de-duplication of messages. A FIFO queue is more expensive than the Basic queue type and requires extra configuration, so you will use the Standard queue type in this tutorial.
  4. In the Configuration section, set the Receive message wait time to 20 seconds. This enables “long polling” to reduce the number of empty responses when clients poll the queue and therefore also reduces the financial cost of the queue.

    Creating the SQS FIFO queue
  5. Leave all the other queue settings at their default values and click Create queue at the bottom of the page. Because Amazon SQS is a distributed system, you might experience a slight delay before the queue appears in the Queues page.
  6. When the queue is listed on the Queues page, select it to display the queue details. Make a note of the queue URL and Subscription region (for example, us-east-2). You will need these to configure the Reactor Firehose integration rule in a later step.
  7. When your queue is available, test that it is working correctly by performing steps 2 and 3 of the Getting Started with Amazon SQS guide.
  8. Leave the Amazon SQS console open in a browser tab so that you can refer to it in later steps.

Step 2: Create a Reactor Firehose integration rule

In this step you want to ensure that messages published by Ably to a specific channel (orders) are delivered to the SQS OrdersQueue that you created in the previous step.

  1. Log into the Ably dashboard and click the Create New App button.
  2. Name your app firehose-sqs-tutorial, select the SSL Required checkbox, then click Create app.
  3. In the page that displays, make a note of your private API key: you will need this in a later step. At the bottom of the page, in the Configure your application panel, click the Change your app settings link.
  4. Select the Integrations tab and click the New Reactor Rule button.
  5. From the list of rule types, select Reactor Firehose and, from the list of services, select Amazon SQS.
  6. Configure your reactor rule as follows, referring to the queue information in the Amazon SQS console:
    • URL: Enter your Amazon SQS queue URL from the SQS console.
    • AWS Region: Select your AWS region from the dropdown (this is also represented in the queue URL)
    • AWS Credentials: Enter your AWS IAM user credentials, in the form: Access key ID:Secret access key
    • Source: Select “Message”, so that the integration sends all messages published in the channel specified in Channel Filter, to your SQS queue.
    • Channel Filter: Enter the regular expression ^orders.* to match the orders channel
    • Enveloped: Leave unchecked. Enabling this option wraps incoming messages in an Ably “envelope” that contains metadata about the message and its payload.

      Configuring your Reactor rule


      Configuring your Reactor rule

Test your rule before proceeding to the next step. Click the Test rule button then, in the dialog that displays, click Run test now:

Testing your Reactor rule

Step 3: Create your server

In this step you will create a Node.js server to test your Reactor Firehose integration with SQS.

First, create a directory for your application and change into it:

mkdir firehose-sqs-demo; cd firehose-sqs-demo

Create two subdirectories within your application directory, to store the static HTML, JavaScript, and CSS files:

mkdir public views

Then, execute npm init to create a Node.js project. For entry point enter server.js and accept the defaults for the other values.

Install the dependencies you will require for this project:

  • ably: The Ably client library.
  • express: A web application framework for Node.js.
  • dotenv: A module that makes it easy to store and read configuration settings from a .env file.
  • faker: A module that generates random data, which you will use to create dummy orders with fake credit card details.
npm install ably express dotenv faker

Create a .env file in your application directory to store configuration settings. Replace your_ably_api_key with your app’s API key that you generated in Step 2. This information is available in the Ably dashboard:

ABLY_API_KEY=your_ably_api_key
NUM_TXNS=5 
PORT=5000  

NUM_TXNS is the number of random transactions your application will generate. Set this value to five initially.

Create the views/index.html file. This HTML page contains a button which you will click to generate some random orders to send to your queue and ultimately show the results of processing those transactions:

<!DOCTYPE html>
<html>
  <head>
    <title>SQS Firehose Demo</title>
    <link rel="stylesheet" href="style.css" />
    <script src="https://cdn.ably.io/lib/ably.min-1.js"></script>
    <script src="subscribe.js"></script>
  </head>
  <body>
    <h1>SQS Demo</h1>
    <div>
      <button id="generateTxns" onclick="generateTxns()">
        Generate Transactions
      </button>
    </div>
    <div>
      <ol id="txns"></ol>
    </div>
  </body>
</html>

Create the public/style.css stylesheet to include some basic styling for the page:

body {
  background-color: wheat;
  padding-left: 10px;
  font-size: 14pt;
  font-family: Verdana, Geneva, Arial, Helvetica, sans-serif;
  color: black;
}

h1 {
  font: 20pt Verdana, Geneva, Arial, Helvetica, sans-serif;
  font-weight: bold;
  line-height: 20pt;
}

Create the public/subscribe.js file. This will contain the client side code for generating the orders and displaying the results of processing those orders. Leave it blank for now: you will write this code in a later step.

Enter the following code in server.js:

// server.js

require("dotenv").config()

const express = require("express")
const app = express()
const faker = require("faker")
const Ably = require("ably")
const ably = new Ably.Rest({ key: process.env.ABLY_API_KEY })

// make all the files in 'public' available
app.use(express.static("public"))

// load the home page, index.html
app.get("/", (request, response) => {
  response.sendFile(__dirname + "/views/index.html")
})

// listen for requests :)
const listener = app.listen(process.env.PORT, () => {
  console.log("Your app is listening on port " + process.env.PORT)
})

This code imports all the required dependencies, instantiates the Ably REST API client and serves the index.html web page and supporting files. Test it by running node server.js and visiting http://localhost:5000 in your browser. If everything is working correctly, you should see a web page with a button inviting you to generate transactions. The button doesn’t do anything yet: you will write the code for this in the next step.

Step 4: Generate random transactions

In this step, you will use the Node.js faker module to generate random transactions that contain realistic credit card numbers and publish them to the orders channel. Because you created a Reactor Firehose integration rule on that channel, the transactions you created will appear in your SQS queue.

First, modify your server.js file to include the /generate route, just below your home (/) route:

app.get("/generate", (request, response) => {
  for (i = 0; i < process.env.NUM_TXNS; i++) {
    let randtxn = {
      amount: faker.finance.amount(),
      timestamp: faker.date.recent(),
      cardnumber: faker.finance.creditCardNumber(),
      expiry: `${faker.date.future().getUTCMonth() + 1}/${
        faker.date.future().getUTCFullYear() + 1
      }`,
      cvv: faker.finance.creditCardCVV(),
      order_id: faker.finance.account(),
    }
    console.log(JSON.stringify(randtxn))
    channel.publish("created", JSON.stringify(randtxn))
  }
  response.sendStatus(200)
})

Then, add a channel variable just after you have instantiated the Ably REST API client, and set it to refer to the orders channel:

...
const Ably = require("ably")
const ably = new Ably.Rest({ key: process.env.ABLY_API_KEY })

const channel = ably.channels.get("orders")

Finally, create the button click event handler in public/subscribe.js that will make a request to this route when you click the Generate Transactions button in the index.html page:

function generateTxns() {
  fetch("/generate")
}

You are now ready to test your queue.

Step 5: Test your queue

Execute node server.js and visit http://localhost:5000.

Click the Generate Transactions button. Your server will generate a number of random transactions equal to the NUM_TXNS setting in .env and output these to the console.

It will also publish these transactions to the orders queue. And, because you have set up a Reactor Firehose integration, they should also be submitted automatically to the OrdersQueue in Amazon SQS.

Check this by visiting your queue details page in the Amazon SQS console and clicking the Poll for messages button:


Checking the queue for new messages

Your messages will appear in the queue. Click a message ID to view the contents of the message:


View the message contents

Step 6: Process your queue

Now that you can send messages to your queue, you need to process those messages. In this tutorial you will create a “serverless” function using AWS Lamdba to do this.

Visit the AWS Lambda console by clicking the Services dropdown in the top-left corner of your Amazon SQS console. Search for “Lambda” and open it in a new tab so that you have both the SQS and Lambda consoles open in your browser.


Opening the AWS Lambda console

Click the Create function button and select the Use a blueprint option. Search for “sqs” in the Blueprints search dialog and select the sqs-poller blueprint. Then, press Configure.


Selecting the Lambda sqs-poller blueprint

In the Basic Info panel, call your new function TransactionProcessor. Ensure that the Create a new role from AWS policy templates option is selected and call the new role TransactionProcessorRole:


Basic configuration of your Lambda function

In the SQS trigger panel, you must provide the name of the queue that will trigger your Lambda when messages are received. Click inside the SQS queue checkbox and AWS should locate your OrdersQueue and its associated ARN (Amazon Resource Name) code.

Note: The ARN code for your queue is also available in your Amazon SQS console

Select your queue and ensure that the Enable trigger checkbox at the bottom of the panel is checked.

At the bottom of the page, review the code that this blueprint provides. This simply handles any incoming SQS messages and logs their contents:

console.log('Loading function');

exports.handler = async (event) => {
    //console.log('Received event:', JSON.stringify(event, null, 2));
    for (const { messageId, body } of event.Records) {
        console.log('SQS message %s: %j', messageId, body);
    }
    return `Successfully processed ${event.Records.length} messages.`;
};

Finally, click the Create function button at the bottom of the page.

Test your Lambda by running node server.js, visiting http://localhost:5000 and clicking the Generate transactions button.

Because this Lambda function is running in the AWS cloud, its console.log() statements are logged in the cloud too, and are visible in AWS CloudWatch.

Visit the CloudWatch console and from the left-hand navigation menu select Logs > Log groups and then the aws/lambda/TransactionProcessor log group. Select one of the log streams and view its contents. If everything is working correctly, you should see the invidual queue entries being processed. Once a message has been processed, it is removed from the queue.


Viewing log entries in AWS Cloudwatch

Once a message has been processed, it is removed from the queue. You can verify this by visiting your queue details page in the Amazon SQS console and clicking the Poll for messages button.

Step 7: Refine your queue processing logic

Now that you have the ability to process transaction items in the queue, you want to randomly authorize or decline the transactions. To achieve this, replace the code in your TransactionProcessor Lambda function with the following code, substituting YOUR_API_KEY with your Ably API key. Then, click the Deploy button.

const https = require("https")

const ablyApiKey = "YOUR_API_KEY"

exports.handler = async (event, context) => {
  for (const { messageId, body } of event.Records) {
    const data = JSON.parse(body)
    const output = {
      amount: data.amount,
      order_id: data.order_id,
      txn_id: messageId,
      status: isAuthorized() ? "authorized" : "declined",
    }
    await postMessage("txn_results", JSON.stringify(output))
    console.log(output)
  }
  console.log(`Successfully processed ${event.Records.length} messages.`)
}

function isAuthorized() {
  let rand = Math.floor(Math.random() * 5 + 1)
  return rand < 4
}

async function postMessage(channel, message) {
  return new Promise((resolve, reject) => {
    const data = JSON.stringify({
      name: "txn",
      data: message,
    })

    const options = {
      host: "rest.ably.io",
      port: 443,
      path: `/channels/${channel}/messages`,
      method: "POST",
      headers: {
        Authorization: `Basic ${Buffer.from(ablyApiKey).toString("base64")}`,
        "Content-Type": "application/json",
        "Content-Length": Buffer.byteLength(data),
      },
    }

    let req = https.request(options)
    req.write(data)
    req.end(null, null, () => {
      /* Request has been fully sent */
      resolve(req)
    })
  })
}

This code reads transactions from your SQS queue and randomly authorizes or declines them. It then publishes the results to an Ably channel called txn_results.

Note that, instead of using the Ably client SDK to publish the message, it makes a direct HTTP request to the /channels/${channel}/messages endpoint. While it is possible to add external dependencies to AWS Lambda functions, it is beyond the scope of this tutorial.

Step 8: Display the processing results

The last step is to make the status of the transactions visible by subscribing to txn_results and displaying the messages that channel receives.

First, you need to authenticate the client with Ably. In a production environment you should use token authentication to achieve this. For this sample application, you will just receive the Ably API key from the server.

In server.js, add a new /auth route (just below the home route /) that returns the Ably API key in the response:

app.get("/auth", (request, response) => {
  response.json({ apiKey: process.env.ABLY_API_KEY })
})

Then, in subscribe.js, add the following code to request the API key, instantiate the client, and subscribe to messages in the txn_results queue:

fetch("/auth")
  .then((response) => response.json())
  .then((data) => {
    const ably = new Ably.Realtime({ key: data.apiKey })
    const channel = ably.channels.get("txn_results")
    channel.subscribe((msg) => {
      const txn = JSON.parse(msg.data)
      document.getElementById(
        "txns"
      ).innerHTML += `<li><strong>Order:</strong> ${txn.order_id} <strong>$ Amount:</strong> ${txn.amount} <strong>Status:</strong> ${txn.status}</li>`
    })
  })
  .catch(function (error) {
    console.log("Error: " + error)
  })

Step 9: Try it out

You are now ready to test this!

  1. Run your server by executing node server.js.
  2. Visit https://localhost:5000 in your browser.
  3. Click the Generate transactions button.
  4. If everything is working correctly, you should see the results of processing your random transactions:


Displaying the transaction processing results

Next steps

1. View the working code for this project on Github.
2. Learn about the various integrations that Ably supports.
3. Find out more about channels and how to publish or subscribe to messages.
4. Try out other tutorials about working with Ably integrations. Visit our tutorials page and select the Reactor Integrations tab.
5. Gain a good technical overview of how the Ably realtime platform works.
6. Get in touch if you need help.