Ably Queues

Ably Queues are traditional message queues that provide a mechanism for you to consume, process, store or reroute data from Ably to your servers. Queues provide an asynchronous machine-to-machine protocol, with each machine assuming one, or both, roles:

Producers
Publish messages to a queue
Consumers
Retrieve messages from the queue.

Ably Queues guarantee at least once delivery using a message acknowledgement protocol. Ably also provides reliable ordering of messages by channel. For example, if messages published in a single channel are republished to a queue, and there is only one consumer for that queue, then the consumer will receive the messages in the order they were published. However, if you have more than one consumer, reliable ordering is not possible. Equally, if you have messages from multiple channels, reliable ordering is only supported per channel not across all channels.

Messages enter a queue through the following process:

  1. You publish a message to a channel, not directly to a queue.
  2. If a queue rule matches the name of the channel the message was published to, Ably applies the rule and identifies the corresponding queue.
  3. Ably copies the message into the designated queue, making it available for consumption.

Messages published to a queue go through the following lifecycle:

  • Ably stores each message by placing it at the back of the queue.
  • Ably retrieves the oldest message from the front of the queue and delivers it to a consumer.
  • Ably enforces a strict first-in, first-out (FIFO) policy to maintain message order.
  • Ably ensures that each message is delivered to only one consumer, even if multiple consumers are connected.

The following diagram illustrates the lifecycle of a message in an Ably Queue:

Ably Queues overview

Queues decouple producers and consumers in your system:

  • Producers can publish messages without waiting for consumers.
  • You can increase throughput capacity by adding more consumers, as each one pulls messages independently.
  • Ably stores messages until a consumer acknowledges successful processing, so if a consumer crashes or disconnects, no data is lost.

The following are some common use cases for Ably Queues:

  • Your servers process each published message by performing a unit of work, for example, generating an image and attaching it to an email.
  • Messages need to be delivered to only one consumer, no matter how many consumers listen.
  • You need to scale your architecture horizontally by adding more consumer worker servers to handle realtime data processing.
  • Your servers need to consume realtime data from channels without maintaining state; they don’t track which channels or clients are active or share state between them.
  • You want a backlog of messages if your consumers fall behind or go offline, ensuring no data is lost.
  • You need to provision the necessary queues ahead of time, for example, one queue for chat messages and another for analytics events.

To get started you need to provision a physical queue and set up a queue rule to move the published messages from a channel into the queue.

You can provision one or more queues for your app, however by default each new app is provisioned without any queues.

The following steps explain how to provision an Ably Queue:

  1. Go to the Queues tab of an app in your Ably dashboard.
  2. Click Provision a new Queue.
  3. In the New Queue section, fill in the following fields:
    • Name – Enter a unique name for your queue. This name, combined with your app ID, forms the full queue identifier.
    • Region – Select the physical region where the queue will be hosted, for example, US East (Virginia). All queues are replicated across two datacenters in that region for high availability.
    • TTL (minutes) – Set the time-to-live for messages. The default and maximum is 60 minutes. Messages that are not consumed within the TTL will be moved to the Dead Letter Queue.
    • Max length – Define the maximum number of messages the queue can retain. The default and maximum is 10,000. When the queue is full, the oldest message is moved to the Dead Letter Queue to make space.
  4. Click Create to finish provisioning the queue.

After you provision a Queue, create one or more Queue rules to republish messages, presence events, or channel events from channels into that queue.

The following steps explain how to set up a queue rule:

  1. Go to the Integrations tab of an app in your Ably dashboard.
  2. Click New Integration Rule.
  3. In the Select your rule type section, choose Queue.
  4. In the Choose queue dropdown, select the queue you want to send data to.
  5. (Optional) Add custom Headers as key:value pairs to include metadata.
    • Click Another header to add additional headers.
  6. In the Source dropdown, select which type of Ably event should trigger the rule.
  7. (Optional) Add a Channel Filter using a regular expression to apply the rule to specific channels. Leave empty to apply to all.
  8. Choose an Encoding format.
  9. Click Create to finish setting up the rule.

Provisioned queues are visible in your app dashboard and provide realtime stats for the current state of each queue.

The following table describes the sections of the queue stats:

Section Purpose
Name Displays the unique identifier for the queue.
State Indicates the current operational state of the queue.
Region Specifies the physical data center location of the queue.
Server endpoint Provides the connection endpoint for the queue.
Vhost Specifies the virtual host associated with the queue.
Messages ready Shows the number of messages waiting to be consumed.
Messages unacked Displays the number of messages acknowledged by consumers.
Total messages Indicates the total number of messages currently in the queue.
Incoming message rates Shows the rate at which messages are added to the queue (per second).
Deliver/get message rates Shows the rate at which messages are delivered or fetched.
Ack message rates Displays the rate of message acknowledgments.
TTL (minutes) Indicates how long messages remain in the queue before expiring.
Max length Defines the maximum number of messages allowed in the queue.
Action Allows for deletion of the queue.

The queue stats show the current state of your queue. Your app and account dashboard provide up-to-date live and historical stats for all messages published to your queues.

The following table describes the sections of the stats:

Section Purpose
Messages total Shows the total number of messages processed during the specified time period.
Messages published (REST & Realtime) Displays the total number of messages sent to Ably.
Messages received (Realtime) Shows the number of messages delivered to subscribers through Realtime channels.
Messages retrieved (history) Indicates how many historical messages were retrieved from storage.
Messages persisted (history) Tracks the number of messages saved in Ably’s message persistence system.
Presence events (REST & Realtime) Displays the number of presence events (e.g. users joining or leaving channels).
Reactor webhook / Function Shows the total number of webhooks or serverless functions triggered by Ably events.
Reactor Queue Displays the number of messages published to queues for asynchronous processing.
Reactor Firehose Tracks the number of messages streamed via the Firehose integration to external systems.
Push & Smart Notifications Indicates the number of push notifications or smart notifications sent.
Peak connections Shows the maximum number of simultaneous connections to the Ably platform.
Peak channels Tracks the highest number of active channels during the specified period.
Data transferred Displays the amount of data transferred through the Ably system during the specified period.
Avg. message size Indicates the average size of messages processed during the specified time frame.

You can verify that messages are reaching your queue by consuming them from the command line using the AMQP protocol.

The following installs the amqp-consume-cli package globally:

npm install amqp-consume-cli -g
Copied!

Once installed, go to your Ably dashboard and retrieve an API key that has the capability to subscribe to queues.

The following example shows how to consume from a queue using the CLI:

amqp-consume --queue-name [Name] \ --host [Server endpoint host] --port [Server endpoint port] \ --ssl --vhost shared --creds [your API key]
Copied!

The following is an example of the amqp-consume output you will see when a message is published to the queue:

Message received Attributes: { contentType: 'application/json', headers: {}, deliveryMode: 1, timestamp: 1485914937984 } Data: { "source":"channel.message", "appId":"ael724", "channel":"foo", "site":"eu-west-1-A", "ruleId":"cOOo9g", <a href="[">messages</a> { "id":"vjzxPR-XK3:3:0", "name":"event", "connectionId":"vjzxPR-XK3", "timestamp":1485914937909, "data":"payload" } ] }
Copied!

The AMQP protocol provides a rich set of functionality to amongst other things bind to exchanges, provision queues and configure routing. This functionality exists so that queues can be dynamically provisioned by clients and messages can be routed to these queues as required.

Unlike Ably channels, queues are pre-provisioned and all routing is handled by queue rules. As such, when subscribing to messages from the provisioned queues, you must not attempt to bind to an exchange or declare a queue as these requests will be rejected. Instead, you should subscribe directly to the queue you wish to consume messages from.

It is possible to consume multiple queues from a single connection, and also consume more than one message at a time. Refer to your AMQP library’s documentation to enable these capabilities.

The following steps explain how to consume messages from an Ably Queue using the AMQP protocol:

Queue name
For example: UATwBQ:example-queue, formed from your app ID and the name you gave the queue.
Host
For example: us-east-1-a-queue.ably.io
Port
5671, the TLS-only port supported for secure AMQP consumption.
Vhost
Always shared
Username
The part before the colon in your API key. For example, APPID.KEYID from APPID.KEYID:SECRET.
Password
The part after the colon in your API key. For example, SECRET from APPID.KEYID:SECRET.
AMQP Client
Connect using any AMQP 0.9.1 compatible client that supports TLS.

The following example shows how to consume from a queue in Node.js:

Node.js v2.11
const url = 'amqps://APPID.KEYID:[email protected]/shared' amqp.connect(url, (err, conn) => { if (err) { return handleError(err) } /* Opens a channel for communication. The word channel is overloaded and this has nothing to do with pub/sub channels */ conn.createChannel((err, ch) => { if (err) { return handleError(err) } /* Wait for messages published to the Ably Queue */ ch.consume('UATwBQ:example-queue', (item) => { let decodedEnvelope = JSON.parse(item.content) /* The envelope messages attribute will only contain one message. However, in future versions, we may allow optional bundling of messages into a single queue message and as such this attribute is an Array to support that in future */ let messages = Ably.Realtime.Message.fromEncodedArray(decodedEnvelope.messages) messages.forEach((message) => { actionMessage(message) }) /* ACK (success) so that message is removed from queue */ ch.ack(item) }) }) })
Copied!

The Node.js example above shows how to consume messages from a queue using AMQP. Take note of the following:

  • The queue rule wraps each message in an envelope (default behavior). The first step is to parse the envelope JSON.
  • The Message.fromEncodedArray method decodes the payload into standard Message objects.
  • While messages is always an Array, each envelope currently contains only one message.

The STOMP protocol is a simple text-based protocol designed for working with message-oriented middleware. It provides an interoperable wire format that allows STOMP clients to talk with any message broker that supports the STOMP protocol and as such is a good fit for use with Ably Queues

The following steps explain the details you will need to use the STOMP protocol:

Queue name
For UATwBQ:example-queue, formed from your app ID and the name you gave the queue.
Host
For example: us-east-1-a-queue.ably.io
Port
61614, the TLS-only STOMP port (different from AMQP‘s port).
Vhost
Always shared
Username
The part before the colon in your API key. For example, APPID.KEYID from APPID.KEYID:SECRET.
Password
The part after the colon in your API key. For example, SECRET from APPID.KEYID:SECRET.
STOMP Client
Connect using any STOMP client that supports TLS.

The following is a simple Node.js example using the stomp/stompjs SDK:

Node.js v2.11
const connectOptions = { 'host': 'us-east-1-a-queue.ably.io', 'port': 61614, /* STOMP TLS port */ 'ssl': true, 'connectHeaders':{ 'host': 'shared', 'login': 'APPID.KEYID', 'passcode': 'SECRET' } } Stompit.connect(connectOptions, (error, client) => { if (err) { return handleError(err) } const subscribeHeaders = { /* To subscribe to an existing queue, /amq/queue prefix is required */ 'destination': '/amq/queue/UATwBQ:example-queue', 'ack': 'client-individual' /* each message requires an ACK to confirm it has been processed */ } /* Wait for messages published to the Ably Queue */ client.subscribe(subscribeHeaders, (error, message) => { if (err) { return handleError(err) } /* STOMP is a text-based protocol so decode UTF-8 string */ message.readString('utf-8', (error, body) => { if (err) { return handleError(err) } let decodedEnvelope = JSON.parse(item.content) /* The envelope messages attribute will only contain one message. However, in future versions, we may allow optional bundling of messages into a single queue message and as such this attribute is an Array to support that in future */ let messages = Ably.Realtime.Message.fromEncodedArray(decodedEnvelope.messages) messages.forEach((message) => { actionMessage(message) }) client.ack(message) }) }) })
Copied!

The node.js example above shows how to consume messages from a queue using STOMP. Take note of the following:

  • The queue rule wraps each message in an envelope (default setting). The first step is to parse the envelope JSON.
  • Use the Message.fromEncodedArray method to decode the message into standard Message objects.
  • While messages is always an Array, each envelope currently contains only one message.

When you configure a queue rule, Ably gives you the option to envelope messages. This option is enabled by default. In most cases, using envelopes adds flexibility. Ably includes additional metadata in a portable format, such as the clientId of the publisher and the channel the message came from.

If performance is your priority, you can disable envelopes. In that case, Ably publishes only the message payload. This removes one layer of parsing but requires you to decode the raw payload yourself.

By default, Ably encodes messages sent to queues in JSON. However, you can choose to use MsgPack, a binary format, when configuring your queue rules.

The following example shows the data sent to the queue when a message is published without an envelope (no headers are required):

{ "source": "channel.message", "appId":"ael724", "channel": "foo", "site": "eu-west-1-A", "ruleId": "cOOo9g", "messages": [ { "id": "vjzxPR-XK3:3:0", "name": "event", "connectionId": "vjzxPR-XK3", "timestamp": 1485914937909, "data": "textPayload" } ] }
Copied!

The following shows the headers sent to the queue when a message is published without an envelope:

X-ABLY-ENVELOPE-SOURCE
channel.message
X-ABLY-ENVELOPE-APPID
ael724
X-ABLY-ENVELOPE-CHANNEL
foo
X-ABLY-ENVELOPE-SITE
eu-west-1-A
X-ABLY-ENVELOPE-RULE-ID
wYge7g
X-ABLY-MESSAGE-ID
vjzxPR-XK3:3:0
X-ABLY-MESSAGE-TIMESTAMP
1485914937909
X-ABLY-MESSAGE-CONNECTION-ID
vjzxPR-XK3

The following example shows the data sent to the queue when a message is published without an envelope:

{ "source": "channel.message", "appId":"ael724", "channel": "foo", "site": "eu-west-1-A", "ruleId": "cOOo9g", "messages": [ { "id": "vjzxPR-XK3:3:0", "name": "event", "connectionId": "vjzxPR-XK3", "timestamp": 1485914937909, "data": "textPayload" } ] }
Copied!

The following example shows the data sent to the queue when a presence message is published without an envelope (no headers are required):

{ "source": "channel.presence", "appId":"ael724", "channel": "foo", "site": "eu-west-1-A", "ruleId": "z8R85g", "presence": [ { "id": "vjzxPR-XK3:5:0", "clientId": "bob", "connectionId": "vjzxPR-XK3", "timestamp": 1485916832961, "action": "enter", "data": "clientData" } ] }
Copied!

The following shows the headers sent to the queue when a presence message is published without an envelope:

X-ABLY-ENVELOPE-SOURCE
channel.presence
X-ABLY-ENVELOPE-APPID
ael724
X-ABLY-ENVELOPE-CHANNEL
foo
X-ABLY-ENVELOPE-SITE
eu-west-1-A
X-ABLY-ENVELOPE-RULE-ID
wYge7g
X-ABLY-MESSAGE-ID
vjzxPR-XK3:5:0
X-ABLY-MESSAGE-TIMESTAMP
1485914937909
X-ABLY-MESSAGE-CONNECTION-ID
vjzxPR-XK3
X-ABLY-MESSAGE-CLIENT-ID
bob
X-ABLY-MESSAGE-ACTION
enter

The following example shows the data sent to the queue when a message is published without an envelope:

{ "source": "channel.presence", "appId":"ael724", "channel": "foo", "site": "eu-west-1-A", "ruleId": "z8R85g", "presence": [ { "id": "vjzxPR-XK3:5:0", "clientId": "bob", "connectionId": "vjzxPR-XK3", "timestamp": 1485916832961, "action": "enter", "data": "clientData" } ] }
Copied!

When you provision a Queue, Ably also provisions a Dead Letter Queue (DLQ) automatically. This special queue stores messages that fail processing or expire before being consumed.

It is recommended that you consume messages from the Dead Letter Queue so you can monitor failures, expirations, and unprocessable messages in your system.

Ably moves messages into the Dead Letter Queue in the following cases:

  • The message is rejected; basic.reject or basic.nack, with requeue=false.
  • The message exceeds its TTL and expires.
  • The Queue reaches its maximum length and a new message is published. In this case, Ably removes the oldest message from the queue and places it in the Dead Letter Queue to make room.

Ably deletes any message in the Dead Letter Queue if it later meets one of these conditions. For example, if it expires due to TTL. These messages are not recoverable.

Ably names the Dead Letter Queue using the reserved format APPID:deadletter, where APPID is your app’s ID. Each app with one or more queues has exactly one Dead Letter Queue, and you’ll see it listed in the Queues dashboard. You can subscribe to it like any other queue.

Ably Queues are offered in two flavors; multi-tenanted and dedicated.

The multi-tenanted service is provided as part of the core platform to all customers. The queues are provided in a high availability configuration with data stored in at least two data centers with automatic failover capabilities. The multi-tenanted queue service is designed for low to medium volumes of messages and has a guideline limit of no more than 200 messages per second, per account.

For customers with more demanding requirements of up to millions of messages per second Ably has two solutions:

  • Dedicated queue clusters that scale to millions of messages, for enterprise customers only.
  • An outbound streaming integration to stream your realtime data directly into your own streaming or queueing service.
Select...