Frontend data models
The Models SDK extends the capability of the LiveSync Postgres database connector with optimistic and confirmed state updates, support for loading the initial model data from the backend, and managing state updates.
How it works
The Models SDK is a standalone SDK built on Ably’s JavaScript SDK. It sits in your frontend applications and helps manage the state updates based on events streamed by the Postgres database connector via Ably channels.
A model
in the Models SDK is a data model representation of a specific part of the frontend application. Each frontend client can have multiple data models
within the same application.
When creating a new model
using the Models SDK you provide two functions to the model
a sync()
function and a merge()
function.
- the
sync()
function is used by the SDK to retrieve the current state of themodel
from your backend, - the
merge()
function is used by the SDK to merge state change events published by the Postgres database connector with the existing frontend state in themodel
.
The following diagram provides a simplified overview of the Models SDK:
Authenticate
An API key is required for your frontend app to authenticate with Ably. API keys are used either to authenticate directly with Ably using basic authentication, or to generate tokens for untrusted clients using token authentication.
API keys and tokens have a set of capabilities assigned to them that specify which operations, such as subscribe
or publish
can be performed on which resources. To use the Models SDK, the API key requires the following capabilities, these two capabilities are discussed further down the page, but for the time being set these two and you can review them further along:
subscribe
for the channels you intend to subscribe to.history
if you intend to sync from historical messages.
Install
The Models SDK requires a realtime client created using the Ably JavaScript SDK to interact with the Ably service.
Install the Ably JavaScript SDK and the Models SDK from NPM:
npm install ably @ably-labs/models
CopyCopied!
Import the SDKs into your project:
import ModelsClient from '@ably-labs/models';
import { Realtime } from 'ably';
CopyCopied!
Instantiate a realtime client using the Ably JavaScript SDK and pass the generated client into the Models constructor:
const ably = new Realtime({ key: '...' });
const modelsClient = new ModelsClient({ ably });
CopyCopied!
ClientOptions
In addition to the underlying Ably realtime client, you can provide a number of other ClientOptions
to configure the default behavior of the Models SDK. Customization of these options is not a requirement, especially if using this library for the first time.
syncOptions
- is used to configure how the model state is synchronised via the sync function.
historyPageSize
- is the limit used when querying for paginated history used to subscribe to changes from the correct point in the channel.
messageRetentionPeriod
- is the message retention period configured on the channel. This is used to determine whether the model state can be brought up to date from message history rather than via a re-sync.
retryStrategy
- defines a retry strategy, an integer of milliseconds between retries, to use if calling the sync function throws an error. Defaults to
-1
to stop retry attempts. eventBufferOptions
- used to configure the in-memory sliding-window buffer used for reordering and deduplication.
optimisticEventOptions
- is used to configure how optimistic events are applied.
logLevel
- configures the log level used to control the verbosity of log output. One of
fatal
,error
,warn
,info
,debug
, ortrace
.
The following is an example of setting ClientOptions
when instantiating the Models SDK:
const modelsClient = new ModelsClient({
ably,
logLevel,
syncOptions: {
historyPageSize,
messageRetentionPeriod,
retryStrategy,
},
eventBufferOptions: {
bufferMs,
eventOrderer,
},
optimisticEventOptions: {
timeout,
},
});
CopyCopied!
Create or retrieve a model
Instantiate a model using the models.get()
method on the models client. A model
is identified by the channel it is subscribed to, if there already a model subscribed to the given channel then it will be returned:
const model = modelsClient.models.get({
channelName: 'posts:123',
sync,
merge,
});
CopyCopied!
Sync function
The sync function is used by the Models SDK to fetch the latest data from your backend. The SDK will automatically call this function when it is initialized, and when the SDK detects that the latest data is no longer available on the Ably channel.
The function can contain any custom logic within it, but needs to return a promise that resolves to two values. The first is a sequenceId
, which is used to identify the point in the stream of change events that corresponds to the current version of the database’s state. The second is an object representing the initial data for your data model.
The following is an example sync function that fetches the latest model state from your database by calling the /api/posts
endpoint in your backend:
async function sync() {
// A call to your backend webserver to retrieve the current state of a post with the ID 123 within your database.
const { sequenceId, data } = await fetch('/api/posts/123')
return { sequenceId, data }
}
CopyCopied!
The following is an example output from the sync function i.e. a direct result returned from your endpoint:
{
// The latest sequenceId corresponding to the current
// version of the database state.
"sequenceId": "1",
// The initial data of the model.
"data":
{
"id": 123,
"text": "Hello World",
"comments": []
}
}
CopyCopied!
The SDK automatically handles retries of the sync function in the case of an error, as specified by the syncOptions.retryStrategy
defined in your ClientOptions
. To configure a retryStrategy
set an integer value of milliseconds for the time between retry attempts. Set this value to -1
to stop automatic retries.
sequenceId
Your backend should return the largest value of the sequence_id
column from your outbox table. This lets the Models SDK to identify the point in the stream of change events that corresponds to the current version of the database state.
Internally, the SDK replays change events starting from the point indicated by the sequenceId
to update the model state with any realtime changes occurring since the state was read from the database.
The sequenceId
is automatically maintained within the outbox table. You can safely read the sequenceId and the initial state of your model within a database transaction using the read committed
transaction isolation level (or greater). This guarantees a consistent view of the state across the database and Ably channel. Postgres uses read committed
by default.
-- Start a database transaction
BEGIN;
-- Query your model state
SELECT * FROM …;
-- Query the latest sequenceId
SELECT COALESCE(MAX(sequence_id), 0) FROM outbox;
-- Commit the database transaction
COMMIT;
CopyCopied!
Merge function
The merge function is used by the Models SDK to ‘merge’ updates received as messages on an Ably channel with the existing state of the model.
When a message is received on the channel, the merge function is called to merge that new message into the existing model state. The merge function is called with the existing model state and the newly received channel message, and it should return the updated model state (with the new message merged in).
The merge function must be pure and deterministic. It should not depend on any external state. The merge will often be called multiple times and in different orders with different states (for example when replaying events after rewinding to a sequenceId
, or when rebasing optimistic events on top of confirmed events).
The following example is a merge function, invoked for all events received on the channel name specified in the model:
function merge(state: Post, event: OptimisticEvent | ConfirmedEvent) {
if (event.name === 'addComment') {
return {
...state,
comments: state.comments.concat([event.data]),
};
}
// handle other event types
}
CopyCopied!
The merge function needs to be registered when a model is instantiated:
const model = modelsClient.models.get({
channelName: 'posts:123',
sync,
merge,
});
CopyCopied!
The event passed to the merge function can be either confirmed or optimistic:
- Confirmed
- change events received from your backend. They describe the result of a change to the data which has been committed to your database.
- Optimistic
- events that describe mutations that have happened locally, but have not yet been confirmed by your backend.
Optimistic updates
The Models SDK supports optimistically updating the model state. Optimistic updates will be reflected in the state immediately, and allow for a snappier user experience while the request to update the backend state is being made.
With optimistic updates enabled, the Models SDK will ‘merge’ the update into the optimistic state immediately. The SDK expects to receive an update on the model’s channel that will ‘confirm’ that optimistic update. Confirming the marks it as no longer optimistic. Optimistic updates that are not confirmed will be automatically rolled back after a timeout (optimisticEventOptions.timeout
).
Use the model.optimistic()
method to apply an optimistic update to your model.
- Call
model.optimistic()
on your model with the optimistic event. - Apply the corresponding change to your backend.
- Await the confirmation of the optimistic update from the backend, or optionally cancel the optimistic update.
The model.optimistic()
function returns a promise resolving to two values:
- A confirmation promise that resolves when the backend confirms or rejects the optimistic update.
- A function to explicitly cancel the optimistic update.
The following demonstrates an optimistic update to implement changes in the model:
// optimistically apply the changes to the model
const [confirmation, cancel] = await model.optimistic({
mutationId: 'my-mutation-id',
name: 'addComment',
data: 'New comment!',
});
try {
// apply the changes in your backend
await updatePost('my-mutation-id', 'New comment!');
// wait for the optimistic event to be confirmed
await confirmation;
} catch (err) {
// something went wrong, cancel the optimistic update
cancel();
}
CopyCopied!
An optimistic update can be confirmed or rejected by your backend. This allows your backend and database to be the source of truth and your data models to automatically react to the changing data received on the Ably channel.
mutationId
The SDK needs to identify when a change event received from the backend matches an optimistic event that has already been applied locally. To match confirmed and optimistic events the SDK uses a mutationId
. This ID can be any string, though it’s commonly a UUID . The mutationId
is generated in your frontend code and assigned to the optimistic event when you call model.optimistic
. You then need to send this mutationId
to your backend as part of your API calls to mutate the model. Finally, the mutationId
should be included in the confirmed event that’s written to the outbox table.
If your backend decides to reject the the mutation, you should write the mutationId
to the outbox with the rejected
column set to true
. This will cause the optimistic update to automatically rollback in the frontend client that triggered it. This rollback will be reflected in a rejected
confirmation promise returned from the call to model.optimistic
.
Optimistic mutations that are not confirmed or rejected within the timeout optimisticEventOptions.timeout
defined in ClientOptions
(default 5 seconds) will be automatically rolled back in the Model state.
BEGIN;
-- mutate your data, e.g.:
INSERT INTO comments (comment) VALUES ('New comment!');
-- write change event to outbox, e.g.:
INSERT INTO outbox (mutation_id, channel, name, ...) VALUES ('my-mutation-id', 'posts:123', 'addComment', ...);
COMMIT;
CopyCopied!
Subscribe to the model
The subscribe callback will be called with the updated model state when the state changes. This is useful for updating elements in your application on change, for example the UI.
The following example subscribes to model state changes:
model.subscribe((err, post) => { /* model state updated! */ });
CopyCopied!
Subscriptions have optimistic updates enabled by default. The callback is triggered whenever the optimistic or confirmed state changes.
To disable optimistic updates and rely only on confirmed updates, set the optimistic
option to false
:
model.subscribe((err, post) => { /* ... */ }, { optimistic: false });
CopyCopied!
Model lifecycle
A model instance can be in one of the following lifecycle states:
- initialized
- the model has been initialized but has not yet attached to the underlying channel.
- syncing
- the model is synchronizing its state from the backend.
- ready
- the model is attached to the channel and processing realtime updates.
- paused
- the user has paused the model.
- errored
- the model has errored processing data from the sync, or from the stream.
- disposed
- the model has been disposed, either by the user disposing it or an unrecoverable error.
Listen for model state change events on a model instance:
model.on('paused', () => { /* model paused*/ });
model.on('ready', () => { /* model resumed */ });
model.on('disposed', () => { /* model disposed */ });
CopyCopied!
Pause and Resume
You can pause a model to prevent the model from consuming realtime updates, while reserving the ability to resume it at some point in the future. Pausing can be useful in instances such as when the UI rendering the model data is temporarily out-of-view.
The following pauses the model. New events will not be processed and subscription callbacks will not be invoked:
await model.pause();
CopyCopied!
The following example resumes the model. Event processing will resume and changes will be made available to subscribers
await model.resume();
CopyCopied!
Dispose
When a model is no longer needed, you can dispose it to free up resources:
await model.dispose();
CopyCopied!
In depth
SequenceId history replay
The SDK will initialise model state using the sync function, and capture the sequenceId
you return from your sync function. The SDK then connects to the model’s channel, and will rewind backwards though history until it finds the correct point in the stream of channel messages (based on the sequenceId
). You can configure how many messages are requested in each page of history when rewinding using the parameter syncOptions.historyPageSize
defined in your ClientOptions
.
After the SDK executes the sync function, it internally connects to the channel and begins subscribing to live, yet unprocessed, messages. It then paginates backward through the channel’s message history, starting from the attachment point, using untilAttach
. The number of items on each page can be configured via the syncOptions.historyPageSize
defined in your ClientOptions
.
When a message is published to the channel with a new sequenceId
, the model processes subsequent messages to maintain continuity with live messages. If the target sequenceId
is unreachable, there is insufficient message history to resume from the correct point to update the model. The SDK will attempt a re-sync. It re-syncs by calling the sync function again to acquire a newer version of the model state.
The amount of history available to query on the channel is determined by your message storage configuration on the channel. This configuration must match the syncOptions.messageRetentionPeriod
defined in your ClientOptions
. The SDK uses this configuration option as a hint as to whether to resynchronize via the sync function and skip paginating through history when messages are expected to have expired.
To ensure the model’s state is up to date, cache the state obtained from the backend endpoint used by the sync function. The model state returned from this endpoint must not exceed the message retention period set on the channel. If no previous messages exist on the channel, the model is assumed to be a new state without any modifications.
Event buffer
Change events from your backend are delivered to the SDK as messages over the channels. By default, messages are processed by the SDK in the order in which they are received.
The SDK supports a short time buffering for change events, facilitating short-term reordering and de-duplication. By default, this feature is not enabled. To activate it, set the eventBufferOptions.bufferMS
in the ClientOption
to a non-zero value.
When using the event buffer, change events are de-duplicated and ordered according to their message.id
, which corresponds to the change event’s sequenceId
.
By default, the events in the buffer are ordered numerically if the message.id
can be coerced to a number. Otherwise, events will be ordered lexicographically by their message.id
. You can specify a custom ordering based on any part of the message via the eventBufferOptions.eventOrderer
ClientOption
.
Best practice
Channels and models
In general, when designing your channels and models, aim for a 1:1:1 relationship between channels, models, and your domain model objects. Channels facilitate data partitioning and access control. For instance, in a task management app, each task can be represented by a model with its own channel, ensuring updates are specific to each task. Ably’s capabilities are applied on a per channel basis allowing fine-grained access control for each data model in your application.
For infrequently updated models, grouping them on the same channel may be viable, while frequently changing models benefit from individual channels. Starting with one channel per model and one model per domain object is a good baseline, adjusting as needed.
Outbox messages
When transmitting data over Ably, it is essential to convey only the changes made, known as “deltas.” This approach ensures easier synchronization across all clients, as each can simply update their existing model with the new state rather than recalculating it. Initially, you may opt to include the entire model state in each message, but as the model grows, this may exceed message size limits. Therefore, using deltas is recommended, enabling frontend applications to react to specific changes efficiently.
For example, in a weather application, updates to temperature might include the new value and its corresponding location:
{"temperature": 25, "city": "London"}
CopyCopied!
Alternatively, standardized schemas like JSONPatch:
[
{
"op": "replace",
"path": "/weather/London/temperature",
"value": 25
}
]
CopyCopied!
While JSONPatch requires more effort to calculate delta events, it simplifies the merge function code, as patches can be applied automatically using libraries like fast-json-patch
.
API Reference
An API reference is available that contains a full list of calls, parameters and responses.