Ably Queues
Ably queues are traditional message queues that provide a reliable and straightforward mechanism for customers to consume, process, store, augment or reroute data from our realtime platform efficiently to your servers.
What are Ably Queues?
Ably Queues provide an asynchronous machine-to-machine communication protocol that follows a traditional message queueing pattern. At a high level, each machine participates in one or both roles: producers (Ably channels) publish messages (data) to a queue; consumers retrieve messages from the queue.
The queue service is responsible for the following functions:
- Storing published messages by placing them at the back of the queue.
- Retrieving the oldest messages from the front of the queue and passing them to a consumer.
- Ensuring a FIFO or ‘first in, first out’ policy is followed.
- Ensuring messages that are consumed successfully are only handed to one of the consumers.
This messaging pattern provides the following advantages:
- Decoupling: publishers can publish without waiting for consumers.
- Scalability: adding more consumers increases throughput capacity.
- Resilience: messages are stored until a consumer has acknowledged the message has been processed successfully.
Ably’s platform also provides channels for realtime data distribution using the pub/sub messaging pattern. Unlike queues, pub/sub channels provide fan-out so that every message published on a channel is received by all devices subscribed for that data. When delivered with connection state recovery, this pattern provides a decoupled, resilient and scalable means to publishing realtime data to any number of devices.
No single pattern is better than the other, both have their merits and valid use cases. Take for example a delivery van driving through a city publishing its location periodically. Any number of customers waiting for their parcel can subscribe for updates and thus a pub/sub channel is well suited due to its inherent fan-out capability. However, emails may need to be triggered when the van is nearing its destination as well.
A message queueing pattern is a better fit here as multiple worker servers can share the workload by consuming the location messages from the queue and performing work for each message without having to share any state.
The message queue ensures that work is distributed evenly to the pool of servers, work is not duplicated (resulting for example in more than one email notification being sent) and the system is resilient to crashes or spikes in load (messages are stored until a consumer is ready to retrieve them).
Ably combines both pub/sub and queueing functionality in its platform as illustrated in the diagram below:
Note: It’s not a requirement for this scenario, but Ably has the Asset Tracking SDK, which can provide a complete and convenient asset tracking solution. You can read more about what’s provided in the Asset Tracking SDK docs.
Ably Queues are more appropriate where:
- “Work” needs to be distributed between your servers for each published message. For example, “work” could be to generate an image and attach it to an email when a message is published.
- Messages should be delivered to only one consumer regardless of how many consumers are listening for new messages.
- You need an architectural design to process realtime data that scales horizontally by simply adding more consumer “worker” servers.
- You want to consume realtime data from channels on your servers statelessly, that is, you do not want to keep track of which channels or clients are active or share state between your servers.
- You want a backlog of messages to build up if the consumers cannot process data quickly enough or if the consumers go offline.
- You can provision the queues you need in advance. For example, you may have one queue for chat messages and another for analytics events.
Note: with the Ably platform all realtime data originates from pub/sub channels, that is, you never publish directly to a queue, you publish to a channel. If a queue rule exists that matches the channel name, then the message published will be automatically published into the designated queue. Therefore if you need to publish and consume data, you will have to publish data to channels over REST or Realtime protocols, and consume your data using an AMQP or STOMP client library.
Using the Ably Queues
All Ably accounts have access to Ably Queue functionality, however to get started you need to provision a physical queue and set up a queue rule to move data from channels into that queue.
Provisioning Ably Queues
You can provision one or more queues for your app, however by default each new app is provisioned without any queues. Once a queue is configured, you will need to set up a queue rule that enables messages to be republished from pub/sub channels into a physical queue.
Unlike pub/sub channels that can exist in any datacenter and are provisioned on-demand by clients, queues need to be provisioned in advance and exist in one region.
Ably Queues are setup within your app dashboard and you need to configure:
- A unique name for the queue. This name, along with the app ID prefixed automatically, is used when consuming the queue from your queue client libraries.
- The region that queue is physically located in. Note that all queues exist across two datacenters in each region for high availability.
- The TTL (time-to-live) for your messages. If the TTL period passes and a message has not been consumed from the queue, then the message is moved to the dead letter queue.
- The maximum length for your queue, which is the total number of messages that your queue can retain in memory and/or on disk. When the queue is considered full based on the max length, a message published to the queue will be accepted however the oldest message on that queue will be moved into the dead letter queue to make room for the new message.
Please note that the total number of queues, TTL and max length for each queue is limited based on your account type. Find out more about account and package limits.
The procedure to provision a queue is as follows:
- In your Ably dashboard click on the Queues tab.
- Click the Provision a new Queue button:
- Fill out information for your queue in the New Queue dialog, and click Create:
- Confirm your queue has been provisioned. Your new queue will appear in the Queues tab along with realtime metrics for your queue.
You will see that a dead letter queue has automatically been provisioned for you. A dead letter queue is used to store messages that failed to be consumed, have passed their TTL, or were discarded due to a queue being full.
Setting up queue rules
Once you have provisioned a physical queue, you need to set up one or more queue rules to republish messages, presence events or channel events from pub/sub channels into a queue. Queue rules can either be used to publish to internal queues (hosted by Ably) or external external streams or queues (such as Amazon Kinesis and RabbitMQ). Publishing to external streams or queues is part of our Ably Firehose servers.
Ably Queue rules are setup in the Integrations tab found within your app dashboard. For internal queue rules you set up you will need to configure:
- The source for the realtime data which is either: Messages** – messages are enqueued as soon as they are published on a channel; Presence events** – when clients enter, update their data, or leave channels, the presence event is enqueued; or Channel lifecycle events** – when a channel is opened (following the first client attaching to this channel) or closed (when there are no more clients attached to the channel), the lifecycle event is enqueued
- An optional channel filter that allows you to filter which channels produce messages or events for your queues. Regular expressions are supported such as
^click_.*_mouse$
- The encoding for your message which is either JSON (the default text format) or MsgPack (a binary format)
- Whether messages published to the queue are wrapped in an envelope or not. The default envelope that wraps all messages published to queues provides additional metadata such as the
channel
,appId
,site
, andruleId
. Non-enveloped messages contain only the payload (data
element of the message) and some metadata is provided in the message headers. See examples of enveloped and non-enveloped messages.
The procedure to create a queue rule is as follows:
- In your Ably dashboard click on the Integrations tab.
- Click the New Integration Rule button:
- Now choose an Ably Queue:
- Fill out information as detailed previously in this section, and then click Create.
Queue dashboards and stats
Provisioned queues are visible in your app dashboard and provide near-realtime stats for the current state of each queue. See an example screenshot below:
Whilst the queue dashboard stats show the current state of your queue, your app and account dashboard provide up-to-date live and historical stats for all messages published to your queues. See an example screenshot from an app dashboard below:
Testing your queue rules
Once your Ably Queue is provisioned, and your Ably Queue rules are configured, there are a number of ways we recommend customers can debug the configured rules and queues:
Use the dev console to generate messages or events that match your queue rule. You can confirm messages are being delivered if the “Messages ready” count in your queue dashboard increases (see above). Note that the messages ready count won’t increase if you have a client consuming messages from this queue.
Install a command line tool for consuming messages using the AMQP protocol to check that messages published on channels (using the dev console or from any other source) are being pushed into the queues based on the queue rules.
You can install Node AMQP Consume CLI with:
npm install amqp-consume-cli -g
CopyCopied!
Then you need to go to your app dashboard to retrieve an API key that has access to the queues (your root key will typically have access to subscribe to all queues). Then check the server endpoint, vhost and queue name (which is always prefixed with a scope which is your appId) from the queue dashboard (see above) and issue a command such as:
amqp-consume --queue-name [Name] \
--host [Server endpoint host] --port [Server endpoint port] \
--ssl --vhost shared --creds [your API key]
CopyCopied!
Whenever a message is published to the queue you are subscribing to, the amqp-consume
tool will output the message details such as:
Message received
Attributes: { contentType: 'application/json',
headers: {},
deliveryMode: 1,
timestamp: 1485914937984 }
Data: {
"source":"channel.message",
"appId":"ael724",
"channel":"foo",
"site":"eu-west-1-A",
"ruleId":"cOOo9g",
<a href="[">messages</a>
{
"id":"vjzxPR-XK3:3:0",
"name":"event",
"connectionId":"vjzxPR-XK3",
"timestamp":1485914937909,
"data":"payload"
}
]
}
CopyCopied!
Note: the messages
attribute is an Array
so that future envelope options may allow messages to be bundled into a single envelope (Webhooks can batch messages). However, with the current queue rule design, an envelope will only ever contain one message.
Consuming messages from Ably Queues is mostly the same as consuming from any other queue supporting AMQP or STOMP protocols. However, there a few tips below to avoid common pitfalls.
Using AMQP
The AMQP protocol provides a rich set of functionality to amongst other things bind to exchanges, provision queues and configure routing. This functionality exists so that queues can be dynamically provisioned by clients and messages can be routed to these queues as required.
However, unlike our pub/sub channels, queues are pre-provisioned via our queue dashboards and all routing is handled by the queue rules. As such, when subscribing to messages from the provisioned queues, you must not attempt to bind to an exchange or declare a queue as these requests will be rejected. Instead, you should subscribe directly to the queue you wish to consume messages from.
Take the following queue as an example:
In order to subscribe to messages from this queue you will need:
- The queue name
UATwBQ:example-queue
which is made up of your app ID and the name you assigned to your queue- The host
us-east-1-a-queue.ably.io
- The port
5671
which is the TLS port you consume from. We only support TLS connections for security reasons- The vhost
shared
- The username
- the part before the
:
of an API key that has access to queues. For example, the username for an API key such asAPPID.KEYID:SECRET
would beAPPID.KEYID
. - The password
- the part after the
:
of the API key. For example, the password for an API key such asAPPID.KEYID:SECRET
would beSECRET
.
A simple example of subscribing to this queue in Node.js can be seen below:
const url = 'amqps://APPID.KEYID:[email protected]/shared'
amqp.connect(url, (err, conn) => {
if (err) { return handleError(err) }
/* Opens a channel for communication. The word channel is overloaded
and this has nothing to do with pub/sub channels */
conn.createChannel((err, ch) => {
if (err) { return handleError(err) }
/* Wait for messages published to the Ably Queue */
ch.consume('UATwBQ:example-queue', (item) => {
let decodedEnvelope = JSON.parse(item.content)
/* The envelope messages attribute will only contain one message. However,
in future versions, we may allow optional bundling of messages into a
single queue message and as such this attribute is an Array to support
that in future */
let messages = Ably.Realtime.Message.fromEncodedArray(decodedEnvelope.messages)
messages.forEach((message) => {
actionMessage(message)
})
/* ACK (success) so that message is removed from queue */
ch.ack(item)
})
})
})
CopyCopied!
Please note:
- In the example above, the queue rule has been configured to wrap each message in an envelope (the default setting). Therefore the first step is to parse the envelope JSON. See details on enveloped messages below.
- The
Message.fromEncodedArray
method is used to decode the message(s) and return an array ofMessage
objects. We strongly recommend you use this method if your client library supports it to ensure messages are decoded correctly and portable across all platforms. - Whilst the code above can handle multiple messages per envelope, we currently only support one message per envelope. The
messages
attribute is anArray
so that in future we could optionally support message bundling.
See our tutorials section for a few step-by-step examples using an Ably Queue with AMQP »
Using STOMP
The STOMP protocol is a simple text-based protocol designed for working with message-oriented middleware. It provides an interoperable wire format that allows STOMP clients to talk with any message broker that supports the STOMP protocol and as such is a good fit for use with Ably Queues.
Assuming the following queue has been set up, we’ll show you a simple example of subscribing to a STOMP queue:
In order to subscribe to messages from this queue you will need:
- The queue name
UATwBQ:example-queue
which is made up of your app ID and the name you assigned to your queue- The host
us-east-1-a-queue.ably.io
- The port
61614
which is the STOMP TLS port you consume from (the port in the screenshot above is for AMQP). We only support TLS connections for security reasons- The vhost
shared
- The username
- the part before the
:
of an API key that has access to queues. For example, the username for an API key such asAPPID.KEYID:SECRET
would beAPPID.KEYID
. - The password
- the part after the
:
of the API key. For example, the password for an API key such asAPPID.KEYID:SECRET
would beSECRET
.
A simple example of subscribing to this queue in Node.js can be seen below:
const connectOptions = {
'host': 'us-east-1-a-queue.ably.io',
'port': 61614, /* STOMP TLS port */
'ssl': true,
'connectHeaders':{
'host': 'shared',
'login': 'APPID.KEYID',
'passcode': 'SECRET'
}
}
Stompit.connect(connectOptions, (error, client) => {
if (err) { return handleError(err) }
const subscribeHeaders = {
/* To subscribe to an existing queue, /amq/queue prefix is required */
'destination': '/amq/queue/UATwBQ:example-queue',
'ack': 'client-individual' /* each message requires an ACK to confirm it has been processed */
}
/* Wait for messages published to the Ably Queue */
client.subscribe(subscribeHeaders, (error, message) => {
if (err) { return handleError(err) }
/* STOMP is a text-based protocol so decode UTF-8 string */
message.readString('utf-8', (error, body) => {
if (err) { return handleError(err) }
let decodedEnvelope = JSON.parse(item.content)
/* The envelope messages attribute will only contain one message. However,
in future versions, we may allow optional bundling of messages into a
single queue message and as such this attribute is an Array to support
that in future */
let messages = Ably.Realtime.Message.fromEncodedArray(decodedEnvelope.messages)
messages.forEach((message) => {
actionMessage(message)
})
client.ack(message)
})
})
})
CopyCopied!
Please note:
- In the example above, the queue rule has been configured to wrap each message in an envelope (the default setting). Therefore the first step is to parse the envelope JSON. See details on enveloped messages below.
- The
Message.fromEncodedArray
method is used to decode the message(s) and return an array ofMessage
objects. We strongly recommend you use this method if your client library supports it to ensure messages are decoded correctly and portable across all platforms. - Whilst the code above can handle multiple messages per envelope, we currently only support one message per envelope. The
messages
attribute is anArray
so that in future we could optionally support message bundling.
See our tutorials section for step-by-step examples using an Ably Queue with STOMP »
When you configure a queue rule, you are given the option to envelope messages, which is enabled by default. In most cases, we believe an enveloped message provides more flexibility as it contains additional metadata in a portable format that can be useful such as the clientId
of the publisher, or the channel
name the message originated from.
However, where performance is a primary concern, you may choose not to envelope messages and instead have only the message payload (data
element) published. This has the advantage of requiring one less parsing step, however decoding of the raw payload in the published message will be your responsibility.
Note that messages published to queues are by default encoded as JSON (a text format), however you can choose to have messages encoded with MsgPack (a binary format) in your queue rules.
Enveloped message example
Headers: none
Data:
{
"source": "channel.message",
"appId":"ael724",
"channel": "foo",
"site": "eu-west-1-A",
"ruleId": "cOOo9g",
"messages": [
{
"id": "vjzxPR-XK3:3:0",
"name": "event",
"connectionId": "vjzxPR-XK3",
"timestamp": 1485914937909,
"data": "textPayload"
}
]
}
CopyCopied!
Note: the messages
attribute is an Array
so that future envelope options may allow messages to be bundled into a single envelope (Webhooks can batch messages). However, with the current queue rule design, an envelope will only ever contain one message.
Headers:
X-ABLY-ENVELOPE-SOURCE
:channel.message
X-ABLY-ENVELOPE-APPID
:ael724
X-ABLY-ENVELOPE-CHANNEL
:foo
X-ABLY-ENVELOPE-SITE
:eu-west-1-A
X-ABLY-ENVELOPE-RULE-ID
:wYge7g
X-ABLY-MESSAGE-ID
:vjzxPR-XK3:3:0
X-ABLY-MESSAGE-TIMESTAMP
:1485914937909
X-ABLY-MESSAGE-CONNECTION-ID
:vjzxPR-XK3
Data:
textPayload
CopyCopied!
Headers: none
Data:
{
"source": "channel.presence",
"appId":"ael724",
"channel": "foo",
"site": "eu-west-1-A",
"ruleId": "z8R85g",
"presence": [
{
"id": "vjzxPR-XK3:5:0",
"clientId": "bob",
"connectionId": "vjzxPR-XK3",
"timestamp": 1485916832961,
"action": "enter",
"data": "clientData"
}
]
}
CopyCopied!
Note: the presence
attribute is an Array
so that future envelope options may allow presence messages to be bundled into a single envelope (Webhooks can batch messages). However, with the current queue rule design, an envelope will only ever contain one presence message.
Headers:
X-ABLY-ENVELOPE-SOURCE
:channel.presence
X-ABLY-ENVELOPE-APPID
:ael724
X-ABLY-ENVELOPE-CHANNEL
:foo
X-ABLY-ENVELOPE-SITE
:eu-west-1-A
X-ABLY-ENVELOPE-RULE-ID
:wYge7g
X-ABLY-MESSAGE-ID
:vjzxPR-XK3:5:0
X-ABLY-MESSAGE-TIMESTAMP
:1485914937909
X-ABLY-MESSAGE-CONNECTION-ID
:vjzxPR-XK3
X-ABLY-MESSAGE-CLIENT-ID
:bob
X-ABLY-MESSAGE-ACTION
:enter
Data:
clientData
CopyCopied!
Dead letter queues
When you provision a queue, Ably automatically provisions a “special” dead letter queue at the same time. This dead letter queue holds messages that have failed to be processed correctly or expired. It is advisable to consume messages from the dead letter queue so that you can keep track of failed, expired or unprocessable messages. Messages are moved into your dead letter queue when:
- The message is rejected (
basic.reject
orbasic.nack
) withrequeue=false
; - The TTL for the message expires; or
- The queue is full (max length limit is reached) and a new message is published to the queue. In this case, the oldest message in the queue is removed and placed in the dead letter queue to make room for the new message
Please note that messages already in the dead letter queue that subsequently meet any of the above criteria are deleted i.e. if the TTL for a message in the dead letter queue passes, the message is deleted forever.
A dead letter queue uses the reserved queue name APPID:deadletter
where APPID
is the app ID in which your queues are provisioned. You will have exactly one deadletter queue per app if you have one or more Ably Queues, and this queue will appear in your queues dashboard. You can subscribe to a dead letter queue just like any other queue.
Download a client library
For a list of popular AMQP and STOMP client libraries you can use across a wide range of platforms, please see our client library download page.
Ably Queue considerations
When using Ably Queues, please bear in mind that:
- Our message queues guarantee at least once delivery using a message acknowledgement protocol (exactly once is not practically achievable)
- Ably provides reliable ordering for you messages by channel. For example, if messages published in a single channel are republished to a queue, and there is only one consumer for that queue, then the consumer will receive the messages in the order they were published. However, if you have more than one consumer, reliable ordering is not possible, equally if you have messages from multiple channels, reliable ordering is only supported per channel not across all channels.
- Rate limits apply to queues depending on your account type. Please see the complete list of account limits.
- There is a default TTL (time-to-live) applied to all messages that is configured when you provision your queue. If a message has not been consumed from a queue within this period, it will be moved to the deadletter queue. If the TTL of the deadletter queue passes, the message is discarded. See account limits.
- There is a max message limit configured when you provision your queue. If the max message limit is reached for your queue, new messages will be moved to the deadletter queue. Once the deadletter queue reaches its max message limit, new messages will be discarded. See account limits.
- With the AMQP protocol, it is possible to consume multiple queues from a single connection, and also to consume more than one message at a time. You will need to refer to your client library’s documentation to enable these capabilities. See this StackOverFlow answer as a good starting point.
- Unlike our Ably pub/sub channels which are implicitly global and distributed, our message queues are provisioned in a single physical region. You can choose the region you want your queue to exist when provisioning your queue. Typically you will want to provision a queue closest to your servers to keep the latency as low as possible.
- The AMQP connection may be disconnected at any point as part of regular maintenance. Most AMQP clients will automatically reconnect, and retry with backoff and jitter in the event that the reconnect fails; but some do not. If your AMQP client does not have this behaviour built-in, you should implement it in your application code.
- Each message published to the queue will count towards you monthly message quota. See billing info for more details.
The Ably Queue service is offered in two flavours, multi-tenanted and dedicated.
Our multi-tenanted queue service is provided as part of the core platform to all customers. The queues are provided in a high availability configuration (your data is stored in at least two datacenters with automatic fail-over capabilities). Our multi-tenanted queue service is designed for low to medium volumes of messages and has a guideline limit of no more than 200 messages per second per account.
For customers with more demanding requirements (up to millions of messages per second), Ably has two solutions:
- Dedicated queue clusters that scale to millions of messages. Enterprise customers only.
- Firehose for streaming your realtime data directly into your own streaming or queueing service.
Get in touch if you’d like to find out more about our Enterprise offering.
Billing info
Each message published by a rule to a queue counts as one message towards your message quota. For example, if you publish a message on a channel that is in turn republished to a Queue, that will count as two messages. Find out more about how messages are counted.