Advanced pub-sub
Once you’ve understood the basics of subscribing to a channel and publishing messages to it, you can explore the more advanced concepts and features. This can help you to build more complex and efficient applications.
Subscribing to channels
There are more concepts to understand and more features you can utilize once you’ve explored the basics of subscribing to channels.
As a reminder, you can subscribe to all messages on a channel:
const realtime = new Ably.Realtime('<loading API key, please wait>');
const channel = realtime.channels.get('lab-gum-all');
await channel.subscribe((message) => {
alert('Received: ' + message.data);
});
Demo OnlyCopyCopied!
Or you can subscribe to messages with a specific name:
await channel.subscribe('myEvent', (message) => {
console.log('message received for event ' + message.name);
console.log('message data:' + message.data);
});
CopyCopied!
Unsubscribe from a channel
Unsubscribing from a channel removes previously registered listeners that were added when subscribing to it. You can remove all listeners, or listeners that were registered for only a single event.
Use the unsubscribe()
method to remove previously registered listeners:
/* remove the listener registered for a single event */
channel.unsubscribe('myEvent', myListener);
/* remove the listener registered for all events */
channel.unsubscribe(myListener);
CopyCopied!
Attaching versus subscribing
Messages are streamed to clients as soon as they attach to a channel, as long as they have the subscribe
capability for it. This is independent of whether or not they have subscribed to the channel.
Subscribing to a channel only registers a listener, or function, client-side that is called each time a message is received. This means that Ably is unaware of whether or not a client is subscribed to a channel.
Channels are not pre-configured or provisioned by Ably in advance. They are created on demand when clients attach to them, and remain active until there are no remaining clients attached. Attaching to a channel is an action that happens implicitly when a client subscribes to it.
The following is an example of implicitly attaching to a channel and then publishing a message:
const channel = realtime.channels.get('chatroom');
await channel.subscribe('action', (message) => { // implicit attach
console.log('Message received ' + message.data);
});
await channel.publish('action', 'boom!');
CopyCopied!
As subscribing to a channel implicitly attaches a client, it is important to understand that if a client subscribes to and then unsubscribes from a channel, the client remains attached. The client will continue to be sent published messages until they detach()
from the channel.
It is also important to understand the difference between between detaching and unsubscribing from a channel, and that messages will continue to be sent to clients if they only call the unsubscribe()
method
The detach()
method detaches a client from a channel. A client will no longer receive any messages published to the channel once they detach. unsubscribe()
only removes message listeners for a channel and is a client-side operation. To reiterate, Ably is unaware of whether or not a client has subscribed or unsubscribed from a channel. Messages will continue to be streamed to the client until detach()
is called.
As subscribe()
implicitly attaches a client to a channel, be aware that if you call subscribe()
followed by unsubscribe()
, the client remains attached to the channel and will continue to be streamed messages from Ably.
Server subscriptions
Subscribing to events server-side using the pub-sub pattern can be disadvantageous as it can increase latency, or duplicate events between multiple servers.
Message queues are more appropriate to use in this instance, as multiple worker servers enable Ably to distribute the load of messages received. This ensures that each message is only processed once, by any one of your worker servers.
Subscription filters
Subscription filters enable you to subscribe to a channel and only receive messages that satisfy a filter expression.
Messages are streamed to clients as soon as they attach to a channel, if they have the subscribe
capability for it. Subscription filters apply server-side filtering to messages, meaning that a client will only ever receive the messages that they subscribe to.
Subscription filters are currently in preview status.
Create a filter expression
Filter expressions should be written using JMESPath. They can be constructed using the message name and message.extras.headers
fields.
message.extras.headers
optionally provides ancillary metadata to a message, as Ably can’t inspect message payloads themselves. Adding suitable key-value pairs to messages will enable more complicated filter expressions to be constructed resulting in more effective message filtering.
The following is an example of publishing a message with additional metadata:
const channel = realtime.channels.get('scoops-kiosk');
await channel.publish({
name: 'ice-cream',
data: '...',
extras: {
headers: {
flavor: 'strawberry',
cost: 35,
temp: 3
}
}
});
CopyCopied!
Be aware that message.extras.headers
must be a flat object. It can’t contain any further nesting or arrays.
The following is an example of a filter expression subscribing to messages with the name “ice-cream”, a flavor of “strawberry” and a cost of less than 50:
name == `"ice-cream"` && headers.flavor == `"strawberry"` && headers.cost < `50`
CopyCopied!
The following is an example of a filter expression subscribing to messages with a flavor of either “strawberry” or “chocolate”:
headers.flavor == `"strawberry"` || headers.flavor == `"chocolate"`
CopyCopied!
Subscribe with a filter
In order to subscribe to a channel with a filter expression, you obtain a channel instance using the getDerived()
method. This accepts a filter expression as a parameter.
The following is an example of subscribing to a channel using one of the previous example filters:
const channel = realtime.channels.getDerived('scoops-kiosk', {
filter: 'name == `"ice-cream"` && headers.flavor == `"strawberry"` && headers.cost < `50`'
})
await channel.subscribe(...);
CopyCopied!
The following example demonstrates publishing to a channel, but subscribing to only a subset of messages on it:
// Connect to Ably
const realtime = new Ably.Realtime({'<loading API key, please wait>'});
// Create a channel instance to publish to
const pubChannel = realtime.channels.get('scoops-kiosk');
// Create a channel instance using the filter qualifier
const subChannel = realtime.channels.getDerived('scoops-kiosk', {
filter: 'name == `"ice-cream"` && headers.flavor == `"strawberry"` && headers.cost < `50`'
});
// Subscribe to the channel using the filtered subscription
await subChannel.subscribe((message) => {
alert('Ice cream update: ' + message.data);
});
// Publish to the unfiltered channel instance
await pubChannel.publish({
name: 'ice-cream',
data: '...',
extras: {
headers: {
flavor: 'strawberry',
cost: 35,
temp: 3
}
});
});
Demo OnlyCopyCopied!
Clients require the subscribe capability for one of the following resources in order to receive messages from a subscription filter:
[filter]<channel name>
[*]<channel name>
[*]*
A client may also attach to the unfiltered instance of a channel for other operations, such as to subscribe to the presence set. Be aware that if clients attach to the unfiltered instance, and have the subscribe capability for the channel itself, they will be sent all messages by Ably. This is because of the difference between attaching and subscribing to a channel.
The following features are not supported using subscription filters:
Publish
There are several more advanced concepts involved in publishing messages once you’ve understood the basics of publishing messages.
As a reminder, to publish a message to a channel:
const realtime = new Ably.Realtime('<loading API key, please wait>');
const channel = realtime.channels.get('lab-gum-all');
await channel.publish('example', 'message data');
Demo OnlyCopyCopied!
Publish to multiple channels
To publish a single message to multiple channels, make multiple publish()
requests using the realtime interface of an SDK. These concurrent requests can be in-flight simultaneously, ensuring that a publish on one channel does not delay operations in other channels.
To publish to multiple channels in a single call, use the batch publish feature.
Echoing messages
By default, clients will receive their own messages if they are also subscribed to the channel. This is known as echoing.
Set the echoMessages
property of ClientOptions
to false
to disable this behavior. This will stop clients from receiving the messages that they published themselves, but they will continue to receive messages published by others.
This property is only available using the realtime interface of an SDK, as it isn’t possible to subscribe to messages using the REST interface.
Transient publishing
Transient publishing is when a client publishes messages without attaching to a channel. This is a feature of the realtime interface of certain Ably SDKs. Transient publishing can be beneficial if you intend to publish to many channels as it removes the need to attach to a channel each time you publish. It also avoids a client subscribing to messages which avoids messages being sent to it redundantly.
The following is an example of publishing without attaching to a channel:
const channel = realtime.channels.get('chatroom');
// The publish below will not attach you to the channel
await channel.publish('action', 'boom!');
CopyCopied!
Idempotent publishing
Idempotency ensures that multiple publishes of the same message cannot result in duplicate messages.
It is possible that a client publishing a message using the REST interface may not receive acknowledgement of receipt from Ably, due to issues such as network failure outside of Ably’s control. Clients will internally attempt to re-publish messages in these instances.
When idempotent publishing is enabled, the Ably SDK will internally assign a unique ID to each message which ensures that subsequent retry attempts cannot result in duplicate messages. Idempotent publishing is enabled by default in all latest Ably SDKs. It can be disabled by setting the idempotentRestPublishing
ClientOptions
to false
.
Note that Ably can only detect duplicate messages within a 2-minute window after the original message, with the same ID, is published. If a message with the same ID is published after this 2-minute window, it will be treated as a new message.
You can also specify message IDs externally. The following is an example of how you might do this:
const rest = new Ably.Rest('<loading API key, please wait>');
const channel = rest.channels.get('lab-gum-all');
await channel.publish([{data: 'payload', id: 'unique123'}]);
Demo OnlyCopyCopied!
If manually specifying message IDs, it is important to be aware of how messages are published when calling the publish() method with an array of messages. See this FAQ for further information.
You can use the REST interface of an Ably SDK to publish messages on behalf of a realtime connection.
To publish on behalf of a realtime connection, the REST publisher requires the connectionKey
of the realtime client. The connectionKey
is a secret of the client unless explicitly shared. The REST publisher can then set the connectionKey
in the root of the published message.
If the realtime connection is identified by being bound to a clientId
, then the REST publish must include that same clientId
. This can be included in the message itself to apply to only that message, in the case that the REST client is able to assume any clientId
, or using a REST client bound to that specific clientId
.
The publish attempt will fail in the following scenarios:
- the
connectionKey
is invalid - the
connectionKey
belongs to a connection that has since been closed - the REST publisher is using a different Ably application to the realtime client
- the
clientId
s don’t match between the realtime connection and the REST publish