Using Firehose with Amazon Kinesis
Firehose enables you to send messages published by the Ably platform directly to a third party streaming or queuing service.
The Amazon Kinesis platform is ideal for rapid ingestion and aggregation of large amounts of data. This data can include application logs, social media, stock prices, and web clickstream data.
In this tutorial you will learn how to use the Firehose integration with Amazon Kinesis. You will publish messages representing random sensor data from a configurable number of simulated IoT devices to a channel and create an integration that automatically pushes these messages into an Amazon Kinesis Data Stream.
You will then write a serverless function using AWS Lambda that reads data from the stream and calculates a running average of temperature and humidity readings. It will send these averages as an Ably message to another channel so that a subscriber can display them. This process is illustrated in the following diagram:
The solution code for this project is available on Github.
Prerequisites
To use Firehose, you need:
- An Ably account
To use Amazon Kinesis, you must:
- Create an AWS account, or use an existing one
- Create an Administrator IAM user
- Obtain your access key ID and secret access key
You can achieve these tasks by performing steps 1-3 in this Amazon guide. Return here when you are done.
Although all of the AWS configuration steps required here can be performed in the AWS Console, this tutorial uses the AWS CLI. To follow along, first install the latest version of the AWS CLI. Then configure the AWS CLI with your IAM user’s security credentials.
The code for this tutorial is written in Node.js, which you can download here.
Step 1: Create your Kinesis data stream
Execute the following aws
command in the terminal to create a Kinesis data stream called sensor-data-stream
, with a single shard:
Shell scriptaws kinesis create-stream --stream-name sensor-data-stream --shard-count 1
Then, run the following describe-stream
command to view details of the stream and confirm that it has been created successfully:
Shell scriptaws kinesis describe-stream --stream-name sensor-data-stream
Make a note of the StreamARN
key’s value. You will need it in later steps:
JSON{
"StreamDescription": {
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49623362581436102050070983950697146036412772107449008130"
}
}
],
==> "StreamARN": "arn:aws:kinesis:eu-west-2:318417720898:stream/sensor-data-stream",
"StreamName": "sensor-data-stream",
"StreamStatus": "ACTIVE",
"RetentionPeriodHours": 24,
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"EncryptionType": "NONE",
"KeyId": null,
"StreamCreationTimestamp": "2021-10-27T11:21:31+01:00"
}
}
Step 2: Create an AWS role for Firehose
So that Ably can write messages to your Kinesis data stream, you need to grant appropriate permissions. The safest way to do this, which does not require you to provide your AWS credentials, is to create an assumable role that the Ably platform can use.
Our AWS Authentication guide explains how to do this. Follow the steps in the guide to create an AWS Kinesis policy called AblyKinesisPolicy
and attach it to a role called AblyKinesisRole
.
Make a note of the AblyKinesisRole
ARN. You need this to configure your Firehose integration rule in the next step.
Step 3: Create the Firehose integration rule
In this step you want to ensure that messages published by Ably to a specific channel (raw-sensor-data
) are delivered to the Kinesis data stream called sensor-data-stream
that you created in Step 1.
1. Log into the Ably dashboard and click the Create New App button.
2. Name your app firehose-kinesis-tutorial
, select the SSL Required checkbox, then click Create app.
3. In the page that displays, make a note of your private API key: you will need this in a later step. At the bottom of the page, in the Configure your application panel, click the Change your app settings link.
4. Select the Integrations tab and click the New Integration Rule button.
5. From the list of rule types, select Firehose and, from the list of services, select Amazon Kinesis.
6. Configure your rule as follows:
-
AWS Region: Select your AWS region from the dropdown. This is represented in the
StreamARN
for your data stream, which you made a note of in Step 1. -
Stream Name: Enter the name of your Kinesis data stream (
sensor-data-stream
). -
AWS Authentication Scheme: Select ARN of an assumable role and enter the
AblyKinesisRole
ARN from the preceding step. - Source: Select “Message”, so that the integration sends all messages published in the channel specified in Channel Filter, to your data stream.
-
Channel Filter: Enter the regular expression
^raw-sensor-data.*
to match theraw-sensor-data
channel. -
Encoding: Select
JSON
. - Enveloped: Leave checked. Enabling this option wraps incoming messages in an Ably “envelope” that contains metadata about the message and its payload.
-
Partition Key: This facility enables you to stream messages to different shards, based on a routing key. As your stream contains only a single shard, enter
1
here.
Test your rule before proceeding to the next step, to ensure that you have the necessary permissions configured correctly. Click the Test rule button then, in the dialog that displays, click Run test now:
Step 4: Create your server
In this step you will create a Node.js server to test your Firehose integration with Amazon Kinesis.
First, create a directory for your application and change into it:
Shell scriptmkdir firehose-kinesis-demo; cd firehose-kinesis-demo
Create two subdirectories within your application directory, to store the static HTML, JavaScript, and CSS files:
Shell scriptmkdir public views
Then, execute npm init
to create a Node.js project. For entry point
enter server.js
and accept the defaults for the other values.
Install the dependencies you will require for this project:
-
ably
: The Ably client library. -
express
: A web application framework for Node.js. -
dotenv
: A module that makes it easy to store and read configuration settings from a.env
file. -
random-number
: A helper module you will use to create random sensor data.
Shell scriptnpm install ably express dotenv random-number
Create a .env
file in your application directory to store configuration settings. Replace your_ably_api_key
with your app’s API key that you generated in Step 3. You can also view your app’s API key in the Ably dashboard:
Plain TextABLY_API_KEY=your_ably_api_key
NUM_SENSORS=5
PORT=5000
NUM_SENSORS
is the number of IoT devices your application will simulate, each of which will generate a random temperature and humidity reading. Set this value to 5
initially.
Create the views/index.html
file. This HTML page contains a button which you will click to generate random sensor data to send to your data stream. It will also ultimately display the average values of those sensor readings:
HTML<html>
<head>
<title>AWS Kinesis Firehose Demo</title>
<link rel="stylesheet" href="style.css" />
<script src="https://cdn.ably.com/lib/ably.min-1.js"></script>
<script src="client.js"></script>
</head>
<body>
<h1>AWS Kinesis Firehose Demo</h1>
<div>
<button id="generateSensorData" onclick="generateSensorData()">
Generate Readings
</button>
</div>
<div>
<p id="count"></p>
<p id="averages"></p>
</div>
</body>
</html>
Create the public/style.css
stylesheet to include some basic styling for the page:
CSSbody {
background-color: skyblue;
padding-left: 10px;
font-size: 14pt;
font-family: Verdana, Geneva, Arial, Helvetica, sans-serif;
color: black;
}
h1 {
font: 20pt Verdana, Geneva, Arial, Helvetica, sans-serif;
font-weight: bold;
line-height: 20pt;
}
Create the public/client.js
file. This will contain the client side code for displaying the sensor data averages. Leave it blank for now: you will write this code in a later step.
Enter the following code in server.js
:
JavaScript// server.js
require("dotenv").config()
const express = require("express")
const rn = require("random-number")
const Ably = require("ably")
const ably = new Ably.Rest({ key: process.env.ABLY_API_KEY })
const app = express()
// make all the files in 'public' available
app.use(express.static("public"))
// load the home page, index.html
app.get("/", (request, response) => {
response.sendFile(__dirname + "/views/index.html")
})
// listen for requests
const listener = app.listen(process.env.PORT, () => {
console.log("Your app is listening on port " + process.env.PORT)
})
This code imports all the required dependencies, instantiates the Ably REST API client and serves the index.html
web page and supporting files. Test it by running node server.js
and visiting http://localhost:5000
in your browser. If everything is working correctly, you should see a web page with a button inviting you to generate sensor readings. The button doesn’t do anything yet: you will write the code for this in the next step.
Step 5: Generate random sensor data
In this step, you will use the Node.js random-number
module to generate random sensor readings that contain temperature and humidity values and then publish them to the raw-sensor-data
channel. Because you created a Firehose integration rule on that channel, the transactions you create will be sent to your data stream.
First, modify your server.js
file to include the /generate
route, just below your home (/
) route:
Plain Textapp.get("/generate", (request, response) => {
for (i = 0; i < process.env.NUM_SENSORS; i++) {
let data = {
temperature: rn({ min: -10, max: 50, integer: false }), // min, max, integer?
humidity: rn({ min: 10, max: 99, integer: true }), // min, max, integer?
}
console.log(data)
channel.publish("reading", JSON.stringify(data))
}
response.sendStatus(200)
})
Then, add a channel
variable just after you have instantiated the Ably REST API client, and use it to retrieve the raw-sensor-data
channel:
JavaScriptconst Ably = require("ably")
const ably = new Ably.Rest({ key: process.env.ABLY_API_KEY })
const channel = ably.channels.get("raw-sensor-data")
Finally, create the button click event handler in public/client.js
that will make a request to the /generate
route when you click the Generate Readings button in the index.html
page:
JavaScriptfunction generateSensorData() {
fetch("/generate")
}
You are now ready to test your integration rule.
Step 6: Test your data stream
Execute node server.js
and visit http://localhost:5000
in your browser.
Click the Generate Readings button. Your server will generate a number of random transactions equal to the NUM_SENSORS
setting in .env
and output them to the console:
JSON{ temperature: 30.918390112267275, humidity: 85 }
{ temperature: -8.111323330144845, humidity: 30 }
{ temperature: 36.882337318219136, humidity: 71 }
{ temperature: 19.391215206587383, humidity: 43 }
{ temperature: -8.3867766311829, humidity: 57 }
It will also publish these readings to the raw-sensor-data
channel. And, because you have set up a Firehose integration rule on this channel, they should also be streamed automatically to the sensor-data-stream
in Amazon Kinesis.
Perform the following steps using the AWS CLI to verify this:
1. Obtain the data stream’s shard ID. Because your stream only has one shard, your stream data will only appear in that shard:
Shell scriptaws kinesis list-shards --stream-name sensor-data-stream
Look for the ShardID
in the response:
JSON{
"Shards": [
{
==> "ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49623362581436102050070983950697146036412772107449008130"
}
}
]
}
2. Before you can get data from the stream you need to obtain the shard iterator for the shard you are interested in. Execute the following command, ensuring that the --shard-id
argument contains the ShardId
shown in the preceding step:
Shell scriptaws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name sensor-data-stream
Copy the value of the ShardIterator
key to your clipboard:
JSON{
"ShardIterator": "AAAAAAAAAAFn/WyQfznjPLcwN5bgn3DdXAU0OOC8U6r8rMp2X5fXMfd5MnZ..."
}
3. Retrieve a list of records from the stream, using the aws kinesis get-records
command, together with your shard iterator:
Shell scriptaws kinesis get-records --shard-iterator AAAAAAAAAAGCBEzYODmrGVb+1aoY0y...
This returns a JSON array of records, each of which corresponds to an item in your data stream:
JSON{
"Records": [
{
"SequenceNumber": "49623362581436102050070983990727097775492785932058230786",
"ApproximateArrivalTimestamp": "2021-10-27T13:01:32.944000+01:00",
"Data": "eyJzb3VyY2UiOiJjaGFubmVsLm1lc3NhZ2UiLCJhcHBJZCI6IjdKZTNqZyIsImNoY...",
"PartitionKey": "1"
},
{
"SequenceNumber": "49623362581436102050070983990728306701313083632831692802",
"ApproximateArrivalTimestamp": "2021-10-27T15:47:13.243000+01:00",
"Data": "eyJzb3VyY2UiOiJjaGFubmVsLm1lc3NhZ2UiLCJhcHBJZCI6IjdKZTNqZyIsImNoY...",
"PartitionKey": "1"
},
{
"SequenceNumber": "49623362581436102050070983990729515627132698262006398978",
"ApproximateArrivalTimestamp": "2021-10-27T15:47:13.245000+01:00",
"Data": "eyJzb3VyY2UiOiJjaGFubmVsLm1lc3NhZ2UiLCJhcHBJZCI6IjdKZTNqZyIsImNoY...",
"PartitionKey": "1"
},
],
"NextShardIterator": "AAAAAAAAAAE3EUa9QKkl6xSWCF6J0Cq1sJpbi0vbwATbApY45R14v...",
"MillisBehindLatest": 0
}
Note that there are six records in the stream: one sent by Ably when you tested your integration rule in Step 3, and five for each of the simulated IoT devices (NUM_SENSORS
in your .env
file). However, the record data is encoded in base 64 and is therefore unreadable.
4. Choose one record other than the earliest (which is the Ably test record) and execute the following command, replacing <data>
with the actual record data, to display its contents:
Shell scriptecho '<data>' | base64 --decode | python -mjson.tool
Note: If you don’t have Python installed, just omit the | python -mjson.tool
command which “pretty prints” the JSON data.
You should see that this record contains some random temperature and humidity data. If so, then your integration rule is working correctly!
JSON{
"source": "channel.message",
"appId": "7Je3jg",
"channel": "raw-sensor-data",
"site": "eu-west-1-A",
"ruleId": "y6IA0A",
"messages": [
{
"id": "9raLUBZaYUjg:0",
"timestamp": 1635346032785,
==> "data": "{\"temperature\":-8.3867766311829,\"humidity\":57}",
"name": "reading"
}
]
}
Next, you will create an AWS Lambda function to process your stream data.
Step 7: Process your stream data
Now that you can send messages to your stream, you want to be able to do something with them. In this step you will create a “serverless” function using AWS Lambda to calculate running averages of the stream sensor data.
First, you need to create an execution role for your lambda function that will give it the permissions it needs to read items from your data stream and log the results in Cloudwatch. (We don’t specifically refer to Cloudwatch in this tutorial, but it can be useful for debugging your lambda code). To create this execution role, perform the following steps:
1. Open the roles page in the IAM console.
2. Choose Create role.
3. Create a role with the following properties.
– Trusted entity: AWS Lambda
– Permissions: AWSLambdaKinesisExecutionRole
– Role name: lambda-kinesis-role
Copy the Role ARN to the clipboard: you will need it to deploy the lambda function you are about to create.
Create a new directory in your project called lambda
and a file within it called index.js
.
Within index.js
, include the following code:
JavaScriptconst https = require("https")
const ablyApiKey = process.env.ABLY_API_KEY
let temperatures = []
let humidities = []
exports.handler = async (event, context) => {
event.Records.forEach((record) => {
// Kinesis data is base64 encoded so decode here
const payload = Buffer.from(record.kinesis.data, "base64").toString("ascii")
const data = JSON.parse(payload)
temperatures.push(data.temperature)
humidities.push(data.humidity)
})
const averages = {
avg_temperature: temperatures.reduce(add, 0) / temperatures.length,
avg_humidity: humidities.reduce(add, 0) / humidities.length,
num_readings: temperatures.length,
}
await postMessage("processed-sensor-data", JSON.stringify(averages))
console.log(averages)
}
async function postMessage(channel, message) {
return new Promise((resolve, reject) => {
const data = JSON.stringify({
name: "averages",
data: message,
})
const options = {
host: "rest.ably.io",
port: 443,
path: `/channels/${channel}/messages`,
method: "POST",
headers: {
Authorization: `Basic ${Buffer.from(ablyApiKey).toString("base64")}`,
"Content-Type": "application/json",
"Content-Length": Buffer.byteLength(data),
},
}
let req = https.request(options)
req.write(data)
req.end(null, null, () => {
// Request has been sent
resolve(req)
})
})
}
function add(accumulator, a) {
return accumulator + a
}
This code reads items from the data stream, calculates a running average and then publishes that information as an Ably message to the processed-sensor-data
channel.
Publish the function by creating a deployment package for it using the AWS CLI. Execute the following code, replacing the --role
argument value with the Role ARN for the lambda-kinesis-role
you created earlier in this step:
Shell scriptcd lambda
zip function.zip index.js
aws lambda create-function --function-name ProcessKinesisRecords \
--zip-file fileb://function.zip --handler index.handler --runtime nodejs12.x \
--role arn:aws:iam::123456789012:role/lambda-kinesis-role
In the code you wrote above, you are referencing your Ably API key as an environment variable, ABLY_API_KEY
. In order for this to work, you need to set this environment variable. Execute the following AWS CLI command to achieve this, replacing <your api key>
with your Ably API key:
Shell scriptaws lambda update-function-configuration --function-name ProcessKinesisRecords --environment Variables={ABLY_API_KEY=<your api key>}
Finally, you need to tell your lambda when to execute. In this case, you want it to run when items appear in your data stream, so you need to configure an event source mapping between your lambda and your stream. Run the following command, replacing the argument to --event-source
with the ARN of your Kinesis data stream that you created in Step 1.
Shell scriptaws lambda create-event-source-mapping --function-name ProcessKinesisRecords --event-source arn:aws:kinesis:<region>:<id>:stream/sensor-data-stream --batch-size 5 --starting-position LATEST
With your lambda function ready to calculate the average temperature and humidity readings from your sensors and publish those results to an Ably channel, you now need to subscribe to that channel so that you can display them.
Step 8: Display the sensor data averages
In this step you will subscribe to the processed-sensor-data
channel and display the messages that your lambda publishes to that channel.
First, you need to authenticate the client with Ably. You will use token authentication to achieve this.
In server.js
, add a new /auth
route (just below the home route /
) that returns the Ably API key in the response:
JavaScriptapp.get("/auth", (request, response) => {
ably.auth.requestToken((err, tokenDetails) => {
if(err) {
console.error(err)
response.sendStatus(500)
return
}
response.json(tokenDetails)
})
})
Then, in client.js
, add the following code under the generateSensorData()
function. This code instantiates the Ably client and subscribes to the processed-sensor-data
channel:
JavaScriptfunction generateSensorData() { }
const ably = new Ably.Realtime({ authUrl: "/auth" })
const channel = ably.channels.get("processed-sensor-data")
channel.subscribe((msg) => {
console.log(msg.data)
const data = JSON.parse(msg.data)
document.getElementById(
"count"
).innerHTML = `<strong>Number of readings: </strong>${data.num_readings}`
document.getElementById(
"averages"
).innerHTML = `<strong>Average temperature (°C):</strong> ${data.avg_temperature.toFixed(
2
)}<br/><strong>Average humidity (%):</strong> ${data.avg_humidity.toFixed(
1
)}<br/>`
})
Step 9: Try it out
You are now ready to test this!
1. Run your server by executing node server.js
in the root of your project directory.
2. Visit https://localhost:5000
in your browser.
3. Click the Generate Readings button.
4. If everything is working correctly, the web page will display your sensor data averages.
5. Continue to click the Generate Readings button and watch the data accumulate.
Next steps
1. View the working code for this project on Github.
2. Learn about the various integrations that Ably supports.
3. Find out more about channels and how to publish or subscribe to messages.
4. Try out other tutorials on working with Ably integrations. Visit our tutorials page and select the Integrations tab.
5. Gain a good technical overview of how the Ably realtime platform works.
6. Get in touch if you need help.