Build a safe text generating app using AMQP message queues and the Neutrino Profanity Filter API

Ably Queues are traditional message queues that provide a reliable and straightforward mechanism for customers to consume, process, store, augment or reroute data from our realtime platform efficiently by your servers.

In this tutorial we will show you how to:

  • Provision an Ably Queue and set up a queue rule to republish messages from a channel into the queue
  • Create a simple Express.js web server to serve up the web app and issue authentication tokens for the Realtime browser client
  • Publish unsafe text from a Realtime browser client to a channel, which is automatically republished by Ably to the provisioned message queue
  • Set up a worker server to consume the messages (unsafe text) from the message queue, communicate with the Neutrino Bad Word Filter API to strip out profanities, and publish the safe text as a message back to the client using the REST API
  • Subscribe to safe text published from the workers in the web app using a realtime pub/sub channel

The following diagram depicts the architecture for this tutorial:


Ably Queues and Neutrino diagram

The diagram may look complicated, but fortunately getting the app up and running with Ably is not! Let’s get started.

P.S. If you want to skip ahead and try out the completed tutorial now, install it in one-click on Heroku for free:

Step 1 – Create your Ably app and API key

To follow this tutorial, you will need an Ably account. Sign up for a free account if you don’t already have one.

Access to the Ably global messaging platform requires an API key for authentication. API keys exist within the context of an Ably application and each application can have multiple API keys so that you can assign different capabilities and manage access to channels and queues.

You can either create a new application for this tutorial, or use an existing one.

To create a new application and generate an API key:

  1. Log in to your Ably account dashboard
  2. Click the “Create New App” button
  3. Give it a name and click “Create app”
  4. Copy your private API key and store it somewhere. You will need it for this tutorial.

To use an existing application and API key:

  1. Select an application from “Your apps” in the dashboard
  2. In the API keys tab, choose an API key to use for this tutorial. The default “Root” API key has full access to capabilities and channels.
  3. Copy the Root API key and store it somewhere. You will need it for this tutorial.

    Copy API Key screenshot

Step 2 – Install Ably on your server

To start using Ably on your Node.js server, you first need to install the NPM module. The NPM module can be installed as follows:

npm install ably

The server client library must be instantiated with the private API key you copied in Step 1. This will ensure that the basic authentication scheme is used, which is almost always the right choice for your own trusted servers where risk of compromise of your API key credentials is low.

Add the following to a file named server.js to instantiate the Ably library inside your Node.js server:

var Ably = require('ably');
var rest = new Ably.Rest({ key: ApiKey });

See this step in Github

Step 3 – Set up your web server & base app HTML

Before a client connects to Ably, it will check if it has suitable credentials to authenticate with Ably. As we’ll be using the recommended token authentication scheme in the client for this demo, when the client starts up and attempt to connect to Ably, it will request a token immediately so that it can then authenticate with Ably.

As tokens are typically issued from your own web servers, we’ll set up a simple web server now for this tutorial and add the base app static HTML and assets.

Express.js is a very popular and simple web server framework for Node.js. Let’s get this setup:

First we’ll add the express NPM module and create a package.json:

{
  "name": "ably-tutorials",
  "repository": "https://github.com/ably/tutorials",
  "license": "Apache-2.0",
  "engines": {
    "node": ">=0.10"
  },
  "scripts": {
    "start": "node ./server.js"
  },
  "dependencies": {
    "ably": ">=1.2",
    "express": "~4.0"
  }
}

Run npm install to install the Node package dependencies.

Then we’ll set up a vanilla HTTP Express.js server in server.js:

const Express = require('express');
const ServerPort = 3000;

/* Start a web server */
const app = Express();

/* Server static HTML files from /public folder */
app.use(Express.static('public'));
app.listen(3000);

console.log('Web server listening on port', ServerPort);

At this point, you should also create a public folder for your HTML, CSS and JS files and then add app.js, index.html and style.css

If you would like to try running the server now, you can do so with node server.js. Once running, open your browser to http://localhost:3000/ and you should see the base HTML app.

See this step in Github

Step 4 – Plug in Ably client-side, add auth endpoint and update UI

First let’s first make sure the web server is capable of issuing token request to clients that need to authenticate with Ably. If you are unfamiliar with client and server token based authentication with Ably, please see the client server token auth tutorial.

By adding the following route to your Express.js server, it is now ready to service clients wanting to authenticate with Ably.

/* Issue token requests to browser clients sending a request to the /auth endpoint */
app.get('/auth', function (req, res) {
  rest.auth.createTokenRequest(function(err, tokenRequest) {
    if (err) {
      res.status(500).send('Error requesting token: ' + JSON.stringify(err));
    } else {
      res.setHeader('Content-Type', 'application/json');
      res.send(JSON.stringify(tokenRequest));
    }
  });
});

Try running your server again with node server.js and visiting http://localhost:3000/auth and you should see a JSON token request in the form:

{
  "keyName": "I2ACJQ.Lv8AUQ",
  "ttl": 3600000,
  "timestamp": 1473894038255,
  "capability": "{\"*\":[\"*\"]}",
  "nonce": "f6799a8e7fa6f77b8e1dac55314789b1",
  "mac": "35nifY9SRZ8KRDfKOPIS1qYWGP16r2lD59zJo9TH8pA="
}

Now that we have a web server running and an authentication endpoint, we are ready to setup the Ably Realtime client that uses token auth.

First add the Ably client library loader from our CDN to the HTML file public/index.html

<head>
  <script src="//cdn.ably.com/lib/ably.min-1.js" crossorigin="anonymous"></script>
</head>

Then add the following to your public/app.js JavaScript file:

var ably = new Ably.Realtime({ authUrl: '/auth' });
ably.connection.on('connecting', function() { showStatus('Connecting to Ably...'); });
ably.connection.on('connected', function() { clearStatus(); });
ably.connection.on('disconnected', function() { showStatus('Disconnected from Ably...'); });
ably.connection.on('suspended', function() { showStatus('Disconnected from Ably for a while...'); });

var $status = $('#status');
function showStatus(text) {
  $status.text(text).show();
}
function clearStatus() {
  $status.hide();
}

The Ably Realtime client will now automatically obtain a token request from the provided authUrl and then connect to Ably using that token request for authentication. At this point, run your server again with node server.js and visiting http://localhost:3000/ you should see the status element change to “Connecting to Ably…” whilst a connection is established, and the status become empty once the connection is established.

Now let’s hook up the publishing of unsafe text to an Ably channel:

var publishChannel = ably.channels.get('neutrino:raw');
var $actionButton = $('#action-btn');
var $text = $('#text');

$actionButton.on('click', () => {
  /* Publish unsafe text to the Ably channel so that the queue worker receives it via queues */
  publishChannel.publish('text', $text.val(), function(err) {
    if (err) {
      return showStatus('Failed to publish text!');
    }
  });
  $text.val(''); /* clear question for next */
});

And add a subscriber that listens for new safe filtered text:

var filteredChannel = ably.channels.get('neutrino:filtered');
var $output = $('#output');

filteredChannel.subscribe(function(message) {
  var $filtered = $('<div class="filtered">').text(message.data.filtered);
  $output.prepend($('<div>').append($filtered));
});

At this point the web app will instance an Ably Realtime library and connect to Ably, will publish messages to the neutrino:raw channel when requested, and will automatically subscribe to filtered text from the neutrino:filtered channel. However the queues and workers are not in place yet, so at this point publishing text won’t do anything.

See this complete step in Github

Step 5 – Provision Ably Queues and queue rules

Unlike pub/sub channels that are created on demand, Ably Message Queues must be provisioned in advance via the Ably app dashboards. Follow these steps to provision a new queue:

  1. Log into your app dashboard
  2. Under “Your apps”, click on “Manage app” for any app you wish to use for this tutorial, or create a new one with the “Create New App” button
  3. Click on the “Queues” tab
  4. Click the “Provision a new Queue” button. When prompted, enter the name neutrino for your queue and click “Create” to create your queue. See screenshot example below.


Provision a Neutrino queue

Once your queue is provisioned it is immediately ready to start receiving messages. However, without a queue rule, no messages will be routed to the queue. Let’s set up a rule that ensures that text published by the web app Realtime client is republished to this new queue, and in turn the workers we will set up next can start consuming these messages. Follow these steps to set up a new queue rule:

  1. Ensure you are still logged into your app dashboard and navigate back to the app dashboard you used to provision your queue.
  2. Click on the “Integrations” tab
  3. Click the “New Integration Rule” button, then choose “Ably Queues”. When prompted, choose the neutrino queue you just provisioned, and enter ^neutrino:raw in the channel filter field to ensure that only unfiltered text published by clients is republished to the queue. The unfiltered text will then sit in the queue waiting for one or more workers (consumers) to process the text. See screenshot example below to set up a queue rule:


Create a queue rule

Before we move on, let’s confirm that this rule is set up correctly. Try running your server again with node server.js and visit http://localhost:3000/. Now type in some text in the text field and click the “Filter my bad language” button. The text should be published by the Realtime library to the channel neutrino:raw, which in turn the queue rule should ensure that the message is republished to the neutrino queue you have just provisioned. Visit the “Queues” tab of your app dashboard again and check the “Message ready” count has increased. See screenshot below:


Check that queue has updated

Step 6 – Add queue worker

We’re now ready to start consuming messages from the queue. There are plenty of AMQP compatible libraries available, however for this tutorial we will us the popular amqplib library. First let’s add the amqplib library to the package.json file:

"dependencies": {
  "ably": ">=1.2",
  "amqplib": "^0.4",
  "express": ">=4.14.0"
}

Run npm install to install the Node package dependencies.

Now create a worker.js file that will consume from the AMQP queue, process messages in real time and post a placeholder response back to the web client using REST:

'use strict';

const amqp = require('amqplib/callback_api');
const Ably = require('ably');

/* Start the worker that consumes from the AMQP QUEUE */
exports.start = function(apiKey, filteredChannelName, queueName, queueEndpoint) {
  const appId = apiKey.split('.')[0];
  const queue = appId + ":" + queueName;
  const endpoint = queueEndpoint;
  const url = 'amqps://' + apiKey + '@' + endpoint;
  const rest = new Ably.Rest({ key: apiKey });
  const filteredChannel = rest.channels.get(filteredChannelName);

  /* Connect to Ably queue */
  amqp.connect(url, (err, conn) => {
    if (err) {
      return console.error('worker:', 'Queue error!', err);
    }

    /* Create a communication channel */
    conn.createChannel((err, ch) => {
      if (err) {
        return console.error('worker:', 'Queue error!', err);
      }

      /* Wait for messages published to the Ably Queue */
      ch.consume(queue, (item) => {
        const decodedEnvelope = JSON.parse(item.content);

        /* Use the Ably library to decode the message.
           The envelope messages attribute will only contain one message. However, in future versions,
           we may allow optional bundling of messages into a single queue message and as such this
           attribute is an Array to support that in future */
        const messages = Ably.Realtime.Message.fromEncodedArray(decodedEnvelope.messages);
        messages.forEach(function(message) {
          filteredChannel.publish('text', 'Placeholder until Neutrino connected', (err) => {
            if (err) {
              console.error('worker:', 'Failed to publish question', message.data, ' - err:', JSON.stringify(err));
            }
          })
        });

        /* Remove message from queue */
        ch.ack(item);
      });
    });

    conn.on('error', function(err) { console.error('worker:', 'Connection error!', err); });
  });
};

Finally, we need to connect the worker to the web server we have created. In a production environment, your worker servers would almost certainly be run on different instances to your web servers. This decoupling is advisable to ensure that you can scale up your workers independently of your web servers to meet demand. However, for simplicity of this demo, when the web server is started a worker server is also started concurrently. Add the following to your server.js

const worker = require('./worker');
worker.start(ApiKey, 'neutrino:filtered', 'neutrino', 'us-east-1-a-queue.ably.io:5671/shared');

Note that the:

  • The queue endpoint is constructed from the endpoint and vhost visible for each queue in the queue tab of your dashboard (see above)
  • The queue name argument neutrino passed to worker.start is in fact appended to the appId in the start function. All fully qualified queue names follow the format appId:queueName as you will see in your queue tab
  • Whilst the code above can handle multiple messages per envelope, we currently only support one message per envelope. The envelope messages attribute is an Array so that in future we could optionally support message bundling

See this complete step in Github

Step 7 – Connect the worker to Neutrino

Everything is now in place to consume raw unfiltered text using a worker and publish the filtered text back to the client, however we need to plug in the Neutrino Bad Word Filter API.

Firstly you need to obtain a Neutrino Api Key. Visit the Neutrino API signup page, enter your details and and click the “Create Account” button:


Neutrino sign up

Once you have created an account, visit your dashboard and click on “API Keys”:


Neutrino dashboard

Finally, copy one of the API keys to your clipboard:


Neutrino API keys

Now that you have an API key you can use to communicate with the Neutrino API, you’ll need an HTTP library for the worker. request provides a straightforward API for HTTP requests, so add it to package.json file:

"dependencies": {
  "ably": ">=1.2",
  "amqplib": "^0.4",
  "express": ">=4.14.0",
  "request": ">=2.79.0"
}

Run npm install to install the Node package dependencies.

Now add the following to worker.js so that consumed text is sent to the Neutrino Bad Word Filter API, and filtered text from the API is published back to the web client through an Ably pub/sub channel:

const NeutrinoUserId = 'INSERT-YOUR-NEUTRINO-USER-ID-HERE';
const NeutrinoApiKey = 'INSERT-YOUR-NEUTRINO-API-KEY-HERE';
const request = require('request');
const querystring = require("querystring");

/* Send text over HTTP to Neutrino to filter the profanities */
function filterTextAndPublish(ablyChannel, text) {
  var url = NeutrinoEndpoint + '?' + querystring.stringify({
    "user-id": NeutrinoUserId,
    "api-key": NeutrinoApiKey,
    "content": text,
    "censor-character": "*"
  });
  request(url, function (error, response, body) {
    if (!error && response.statusCode == 200) {
      publishFilteredText(ablyChannel, text, JSON.parse(body)["censored-content"]);
    } else {
      if (body) {
        publishFilteredText(ablyChannel, text, "Neutrino couldn't compute: " + body);
      } else {
        publishFilteredText(ablyChannel, text, "Neutrino error: " + JSON.stringify(error));
      }
    }
  });
}

function publishFilteredText(ablyChannel, rawText, filteredText) {
  ablyChannel.publish('text', { filteredText: filteredText }, function(err) {
    if (err) {
      console.error('worker:', 'Failed to publish text', JSON.stringify(err));
    }
  });
}

And replace the existing messages.forEach block in the start function with the following:

messages.forEach(function(message) {
  filterTextAndPublish(filteredChannel, neutrinoUserId, neutrinoApiKey, message.data);
});

That’s it, you are now issuing HTTP requests to Neutrino whenever a message is consumed from the message queue, and publishing the responses back to the web client using the REST library.

Try running your server again with node server.js and visit http://localhost:3000/. Now type in some text in the text field and click the “Filter my bad language” button. You should see a response from Neutrino appear within the time it takes for Neutrino to filter the profanities from your text.

See this step in Github

We’ve covered a lot of ground in this tutorial. You’ve successfully covered off a lot of Ably functionality including token authentication, realtime pub/sub and message queues. We strongly recommend you review the next steps below for essential further reading to help you understand how to use Ably Queues in more detail.

Try this tutorial in one click on Heroku

The simplest way to try out this demo is to just install it now on Heroku for free. When installing with Heroku, you will automatically be provisioned with a free Ably account and the steps you need to take to configure Wolfram and the queue rules will be shown to you after the Heroku app is provisioned.

Download tutorial source code

git clone https://github.com/ably/tutorials.git

Checkout the tutorial branch:

git checkout queue-neutrino-profanity-nodejs

And then run the demo locally by adding your Ably API key and Neutrino User ID and API Key to server.js, setting up the queues and queue rules as described in this tutorial and running the demo node server.js

Next steps

1. Find out more about Ably Queues
2. Find out when you should use message queues as opposed to pub/sub channels
2. Read an article on why Message Queues solve the problem of consuming realtime data on your servers
3. Find out more about Ably Integrations
4. Learn more about other Ably features by stepping through our other Ably tutorials
5. Gain a good technical overview of how the Ably realtime platform works
6. Get in touch if you need help