Using Firehose with Amazon Kinesis

Firehose enables you to send messages published by the Ably platform directly to a third party streaming or queuing service.

The Amazon Kinesis platform is ideal for rapid ingestion and aggregation of large amounts of data. This data can include application logs, social media, stock prices, and web clickstream data.

In this tutorial you will learn how to use the Firehose integration with Amazon Kinesis. You will publish messages representing random sensor data from a configurable number of simulated IoT devices to a channel and create an integration that automatically pushes these messages into an Amazon Kinesis Data Stream.

You will then write a serverless function using AWS Lambda that reads data from the stream and calculates a running average of temperature and humidity readings. It will send these averages as an Ably message to another channel so that a subscriber can display them. This process is illustrated in the following diagram:


Firehose with AWS Kinesis tutorial overview

The solution code for this project is available on Github.

Prerequisites

To use Firehose, you need:

To use Amazon Kinesis, you must:

  • Create an AWS account, or use an existing one
  • Create an Administrator IAM user
  • Obtain your access key ID and secret access key

You can achieve these tasks by performing steps 1-3 in this Amazon guide. Return here when you are done.

Although all of the AWS configuration steps required here can be performed in the AWS Console, this tutorial uses the AWS CLI. To follow along, first install the latest version of the AWS CLI. Then configure the AWS CLI with your IAM user’s security credentials.

The code for this tutorial is written in Node.js, which you can download here.

Step 1: Create your Kinesis data stream

Execute the following aws command in the terminal to create a Kinesis data stream called sensor-data-stream, with a single shard:

aws kinesis create-stream --stream-name sensor-data-stream --shard-count 1

Then, run the following describe-stream command to view details of the stream and confirm that it has been created successfully:

aws kinesis describe-stream --stream-name sensor-data-stream

Make a note of the StreamARN key’s value. You will need it in later steps:

{
    "StreamDescription": {
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "340282366920938463463374607431768211455"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49623362581436102050070983950697146036412772107449008130"
                }
            }
        ],
   ==>  "StreamARN": "arn:aws:kinesis:eu-west-2:318417720898:stream/sensor-data-stream",
        "StreamName": "sensor-data-stream",
        "StreamStatus": "ACTIVE",
        "RetentionPeriodHours": 24,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "KeyId": null,
        "StreamCreationTimestamp": "2021-10-27T11:21:31+01:00"
    }
}

Step 2: Create an AWS role for Firehose

So that Ably can write messages to your Kinesis data stream, you need to grant appropriate permissions. The safest way to do this, which does not require you to provide your AWS credentials, is to create an assumable role that the Ably platform can use.

Our AWS Authentication guide explains how to do this. Follow the steps in the guide to create an AWS Kinesis policy called AblyKinesisPolicy and attach it to a role called AblyKinesisRole.

Make a note of the AblyKinesisRole ARN. You need this to configure your Firehose integration rule in the next step.

Step 3: Create the Firehose integration rule

In this step you want to ensure that messages published by Ably to a specific channel (raw-sensor-data) are delivered to the Kinesis data stream called sensor-data-stream that you created in Step 1.

1. Log into the Ably dashboard and click the Create New App button.
2. Name your app firehose-kinesis-tutorial, select the SSL Required checkbox, then click Create app.
3. In the page that displays, make a note of your private API key: you will need this in a later step. At the bottom of the page, in the Configure your application panel, click the Change your app settings link.
4. Select the Integrations tab and click the New Integration Rule button.
5. From the list of rule types, select Firehose and, from the list of services, select Amazon Kinesis.
6. Configure your rule as follows:

  • AWS Region: Select your AWS region from the dropdown. This is represented in the StreamARN for your data stream, which you made a note of in Step 1.
  • Stream Name: Enter the name of your Kinesis data stream (sensor-data-stream).
  • AWS Authentication Scheme: Select ARN of an assumable role and enter the AblyKinesisRole ARN from the preceding step.
  • Source: Select “Message”, so that the integration sends all messages published in the channel specified in Channel Filter, to your data stream.
  • Channel Filter: Enter the regular expression ^raw-sensor-data.* to match the raw-sensor-data channel.
  • Encoding: Select JSON.
  • Enveloped: Leave checked. Enabling this option wraps incoming messages in an Ably “envelope” that contains metadata about the message and its payload.
  • Partition Key: This facility enables you to stream messages to different shards, based on a routing key. As your stream contains only a single shard, enter 1 here.

    Configuring your rule


    Configuring your rule

Test your rule before proceeding to the next step, to ensure that you have the necessary permissions configured correctly. Click the Test rule button then, in the dialog that displays, click Run test now:

Testing your rule

Step 4: Create your server

In this step you will create a Node.js server to test your Firehose integration with Amazon Kinesis.

First, create a directory for your application and change into it:

mkdir firehose-kinesis-demo; cd firehose-kinesis-demo

Create two subdirectories within your application directory, to store the static HTML, JavaScript, and CSS files:

mkdir public views

Then, execute npm init to create a Node.js project. For entry point enter server.js and accept the defaults for the other values.

Install the dependencies you will require for this project:

  • ably: The Ably client library.
  • express: A web application framework for Node.js.
  • dotenv: A module that makes it easy to store and read configuration settings from a .env file.
  • random-number: A helper module you will use to create random sensor data.
npm install ably express dotenv random-number

Create a .env file in your application directory to store configuration settings. Replace your_ably_api_key with your app’s API key that you generated in Step 3. You can also view your app’s API key in the Ably dashboard:

ABLY_API_KEY=your_ably_api_key
NUM_SENSORS=5
PORT=5000

NUM_SENSORS is the number of IoT devices your application will simulate, each of which will generate a random temperature and humidity reading. Set this value to 5 initially.

Create the views/index.html file. This HTML page contains a button which you will click to generate random sensor data to send to your data stream. It will also ultimately display the average values of those sensor readings:

<!DOCTYPE html>
<html>
  <head>
    <title>AWS Kinesis Firehose Demo</title>
    <link rel="stylesheet" href="style.css" />
    <script src="https://cdn.ably.com/lib/ably.min-1.js"></script>
    <script src="client.js"></script>
  </head>
  <body>
    <h1>AWS Kinesis Firehose Demo</h1>
    <div>
      <button id="generateSensorData" onclick="generateSensorData()">
        Generate Readings
      </button>
    </div>
    <div>
      <p id="count"></p>
      <p id="averages"></p>
    </div>
  </body>
</html>

Create the public/style.css stylesheet to include some basic styling for the page:

body {
  background-color: skyblue;
  padding-left: 10px;
  font-size: 14pt;
  font-family: Verdana, Geneva, Arial, Helvetica, sans-serif;
  color: black;
}

h1 {
  font: 20pt Verdana, Geneva, Arial, Helvetica, sans-serif;
  font-weight: bold;
  line-height: 20pt;
}

Create the public/client.js file. This will contain the client side code for displaying the sensor data averages. Leave it blank for now: you will write this code in a later step.

Enter the following code in server.js:

// server.js

require("dotenv").config()

const express = require("express")
const rn = require("random-number")
const Ably = require("ably")
const ably = new Ably.Rest({ key: process.env.ABLY_API_KEY })

const app = express()

// make all the files in 'public' available
app.use(express.static("public"))

// load the home page, index.html
app.get("/", (request, response) => {
  response.sendFile(__dirname + "/views/index.html")
})

// listen for requests
const listener = app.listen(process.env.PORT, () => {
  console.log("Your app is listening on port " + process.env.PORT)
})

This code imports all the required dependencies, instantiates the Ably REST API client and serves the index.html web page and supporting files. Test it by running node server.js and visiting http://localhost:5000 in your browser. If everything is working correctly, you should see a web page with a button inviting you to generate sensor readings. The button doesn’t do anything yet: you will write the code for this in the next step.

Step 5: Generate random sensor data

In this step, you will use the Node.js random-number module to generate random sensor readings that contain temperature and humidity values and then publish them to the raw-sensor-data channel. Because you created a Firehose integration rule on that channel, the transactions you create will be sent to your data stream.

First, modify your server.js file to include the /generate route, just below your home (/) route:

app.get("/generate", (request, response) => {
  for (i = 0; i < process.env.NUM_SENSORS; i++) {
    let data = {
      temperature: rn({ min: -10, max: 50, integer: false }), // min, max, integer?
      humidity: rn({ min: 10, max: 99, integer: true }), // min, max, integer?
    }

    console.log(data)

    channel.publish("reading", JSON.stringify(data))
  }
  response.sendStatus(200)
})

Then, add a channel variable just after you have instantiated the Ably REST API client, and use it to retrieve the raw-sensor-data channel:

...
const Ably = require("ably")
const ably = new Ably.Rest({ key: process.env.ABLY_API_KEY })

const channel = ably.channels.get("raw-sensor-data")

Finally, create the button click event handler in public/client.js that will make a request to the /generate route when you click the Generate Readings button in the index.html page:

function generateSensorData() {
  fetch("/generate")
}

You are now ready to test your integration rule.

Step 6: Test your data stream

Execute node server.js and visit http://localhost:5000 in your browser.

Click the Generate Readings button. Your server will generate a number of random transactions equal to the NUM_SENSORS setting in .env and output them to the console:

{ temperature: 30.918390112267275, humidity: 85 }
{ temperature: -8.111323330144845, humidity: 30 }
{ temperature: 36.882337318219136, humidity: 71 }
{ temperature: 19.391215206587383, humidity: 43 }
{ temperature: -8.3867766311829, humidity: 57 }

It will also publish these readings to the raw-sensor-data channel. And, because you have set up a Firehose integration rule on this channel, they should also be streamed automatically to the sensor-data-stream in Amazon Kinesis.

Perform the following steps using the AWS CLI to verify this:

1. Obtain the data stream’s shard ID. Because your stream only has one shard, your stream data will only appear in that shard:

aws kinesis list-shards --stream-name sensor-data-stream

Look for the ShardID in the response:

{
    "Shards": [
        {
      ==>   "ShardId": "shardId-000000000000",
            "HashKeyRange": {
                "StartingHashKey": "0",
                "EndingHashKey": "340282366920938463463374607431768211455"
            },
            "SequenceNumberRange": {
                "StartingSequenceNumber": "49623362581436102050070983950697146036412772107449008130"
            }
        }
    ]
}

2. Before you can get data from the stream you need to obtain the shard iterator for the shard you are interested in. Execute the following command, ensuring that the --shard-id argument contains the ShardId shown in the preceding step:

aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name sensor-data-stream

Copy the value of the ShardIterator key to your clipboard:

{
    "ShardIterator": "AAAAAAAAAAFn/WyQfznjPLcwN5bgn3DdXAU0OOC8U6r8rMp2X5fXMfd5MnZ..."
}

3. Retrieve a list of records from the stream, using the aws kinesis get-records command, together with your shard iterator:

aws kinesis get-records --shard-iterator AAAAAAAAAAGCBEzYODmrGVb+1aoY0y...

This returns a JSON array of records, each of which corresponds to an item in your data stream:

{
    "Records": [
        {
            "SequenceNumber": "49623362581436102050070983990727097775492785932058230786",
            "ApproximateArrivalTimestamp": "2021-10-27T13:01:32.944000+01:00",
            "Data": "eyJzb3VyY2UiOiJjaGFubmVsLm1lc3NhZ2UiLCJhcHBJZCI6IjdKZTNqZyIsImNoY...",
            "PartitionKey": "1"
        },
        {
            "SequenceNumber": "49623362581436102050070983990728306701313083632831692802",
            "ApproximateArrivalTimestamp": "2021-10-27T15:47:13.243000+01:00",
            "Data": "eyJzb3VyY2UiOiJjaGFubmVsLm1lc3NhZ2UiLCJhcHBJZCI6IjdKZTNqZyIsImNoY...",
            "PartitionKey": "1"
        },
        {
            "SequenceNumber": "49623362581436102050070983990729515627132698262006398978",
            "ApproximateArrivalTimestamp": "2021-10-27T15:47:13.245000+01:00",
            "Data": "eyJzb3VyY2UiOiJjaGFubmVsLm1lc3NhZ2UiLCJhcHBJZCI6IjdKZTNqZyIsImNoY...",
            "PartitionKey": "1"
        },
        ...
    ],
    "NextShardIterator": "AAAAAAAAAAE3EUa9QKkl6xSWCF6J0Cq1sJpbi0vbwATbApY45R14v...",
    "MillisBehindLatest": 0
}

Note that there are six records in the stream: one sent by Ably when you tested your integration rule in Step 3, and five for each of the simulated IoT devices (NUM_SENSORS in your .env file). However, the record data is encoded in base 64 and is therefore unreadable.

4. Choose one record other than the earliest (which is the Ably test record) and execute the following command, replacing <data> with the actual record data, to display its contents:

echo '<data>' | base64 --decode | python -mjson.tool

Note: If you don’t have Python installed, just omit the | python -mjson.tool command which “pretty prints” the JSON data.

You should see that this record contains some random temperature and humidity data. If so, then your integration rule is working correctly!

{
    "source": "channel.message",
    "appId": "7Je3jg",
    "channel": "raw-sensor-data",
    "site": "eu-west-1-A",
    "ruleId": "y6IA0A",
    "messages": [
        {
            "id": "9raLUBZaYUjg:0",
            "timestamp": 1635346032785,
      ==>   "data": "{\"temperature\":-8.3867766311829,\"humidity\":57}",
            "name": "reading"
        }
    ]
}

Next, you will create an AWS Lambda function to process your stream data.

Step 7: Process your stream data

Now that you can send messages to your stream, you want to be able to do something with them. In this step you will create a “serverless” function using AWS Lambda to calculate running averages of the stream sensor data.

First, you need to create an execution role for your lambda function that will give it the permissions it needs to read items from your data stream and log the results in Cloudwatch. (We don’t specifically refer to Cloudwatch in this tutorial, but it can be useful for debugging your lambda code). To create this execution role, perform the following steps:

1. Open the roles page in the IAM console.
2. Choose Create role.
3. Create a role with the following properties.
– Trusted entity: AWS Lambda
– Permissions: AWSLambdaKinesisExecutionRole
– Role name: lambda-kinesis-role

Copy the Role ARN to the clipboard: you will need it to deploy the lambda function you are about to create.

Create a new directory in your project called lambda and a file within it called index.js.

Within index.js, include the following code:

const https = require("https")

const ablyApiKey = process.env.ABLY_API_KEY

let temperatures = []
let humidities = []

exports.handler = async (event, context) => {
  event.Records.forEach((record) => {
    // Kinesis data is base64 encoded so decode here
    const payload = Buffer.from(record.kinesis.data, "base64").toString("ascii")
    const data = JSON.parse(payload)
    temperatures.push(data.temperature)
    humidities.push(data.humidity)
  })

  const averages = {
    avg_temperature: temperatures.reduce(add, 0) / temperatures.length,
    avg_humidity: humidities.reduce(add, 0) / humidities.length,
    num_readings: temperatures.length,
  }

  await postMessage("processed-sensor-data", JSON.stringify(averages))
  console.log(averages)
}

async function postMessage(channel, message) {
  return new Promise((resolve, reject) => {
    const data = JSON.stringify({
      name: "averages",
      data: message,
    })

    const options = {
      host: "rest.ably.io",
      port: 443,
      path: `/channels/${channel}/messages`,
      method: "POST",
      headers: {
        Authorization: `Basic ${Buffer.from(ablyApiKey).toString("base64")}`,
        "Content-Type": "application/json",
        "Content-Length": Buffer.byteLength(data),
      },
    }

    let req = https.request(options)
    req.write(data)
    req.end(null, null, () => {
      // Request has been sent
      resolve(req)
    })
  })
}

function add(accumulator, a) {
  return accumulator + a
}

This code reads items from the data stream, calculates a running average and then publishes that information as an Ably message to the processed-sensor-data channel.

Publish the function by creating a deployment package for it using the AWS CLI. Execute the following code, replacing the --role argument value with the Role ARN for the lambda-kinesis-role you created earlier in this step:

cd lambda
zip function.zip index.js
aws lambda create-function --function-name ProcessKinesisRecords \
--zip-file fileb://function.zip --handler index.handler --runtime nodejs12.x \
--role arn:aws:iam::123456789012:role/lambda-kinesis-role

In the code you wrote above, you are referencing your Ably API key as an environment variable, ABLY_API_KEY. In order for this to work, you need to set this environment variable. Execute the following AWS CLI command to achieve this, replacing <your api key> with your Ably API key:

aws lambda update-function-configuration --function-name ProcessKinesisRecords --environment Variables={ABLY_API_KEY=<your api key>}

Finally, you need to tell your lambda when to execute. In this case, you want it to run when items appear in your data stream, so you need to configure an event source mapping between your lambda and your stream. Run the following command, replacing the argument to --event-source with the ARN of your Kinesis data stream that you created in Step 1.

aws lambda create-event-source-mapping --function-name ProcessKinesisRecords --event-source arn:aws:kinesis:<region>:<id>:stream/sensor-data-stream --batch-size 5 --starting-position LATEST

With your lambda function ready to calculate the average temperature and humidity readings from your sensors and publish those results to an Ably channel, you now need to subscribe to that channel so that you can display them.

Step 8: Display the sensor data averages

In this step you will subscribe to the processed-sensor-data channel and display the messages that your lambda publishes to that channel.

First, you need to authenticate the client with Ably. You will use token authentication to achieve this.

In server.js, add a new /auth route (just below the home route /) that returns the Ably API key in the response:

app.get("/auth", (request, response) => {
  ably.auth.requestToken((err, tokenDetails) => {
    if(err) {
      console.error(err)
      response.sendStatus(500)
      return
    }
    response.json(tokenDetails)
  })
})

Then, in client.js, add the following code under the generateSensorData() function. This code instantiates the Ably client and subscribes to the processed-sensor-data channel:

function generateSensorData() {...}

const ably = new Ably.Realtime({ authUrl: "/auth" })
const channel = ably.channels.get("processed-sensor-data")
channel.subscribe((msg) => {
  console.log(msg.data)
  const data = JSON.parse(msg.data)
  document.getElementById(
    "count"
  ).innerHTML = `<strong>Number of readings: </strong>${data.num_readings}`
  document.getElementById(
    "averages"
  ).innerHTML = `<strong>Average temperature (°C):</strong> ${data.avg_temperature.toFixed(
    2
  )}<br/><strong>Average humidity (%):</strong> ${data.avg_humidity.toFixed(
    1
  )}<br/>`
})

Step 9: Try it out

You are now ready to test this!

1. Run your server by executing node server.js in the root of your project directory.
2. Visit https://localhost:5000 in your browser.
3. Click the Generate Readings button.
4. If everything is working correctly, the web page will display your sensor data averages.
5. Continue to click the Generate Readings button and watch the data accumulate.


Displaying the transaction processing results

Next steps

1. View the working code for this project on Github.
2. Learn about the various integrations that Ably supports.
3. Find out more about channels and how to publish or subscribe to messages.
4. Try out other tutorials on working with Ably integrations. Visit our tutorials page and select the Integrations tab.
5. Gain a good technical overview of how the Ably realtime platform works.
6. Get in touch if you need help.