Frontend data models

The Models SDK extends the capability of the LiveSync Postgres database connector with optimistic and confirmed state updates, support for loading the initial model data from the backend, and managing state updates.

The Models SDK is a standalone SDK built on Ably’s JavaScript SDK. It sits in your frontend applications and helps manage the state updates based on events streamed by the Postgres database connector via Ably channels.

A model in the Models SDK is a data model representation of a specific part of the frontend application. Each frontend client can have multiple data models within the same application.

When creating a new model using the Models SDK you provide two functions to the model a sync() function and a merge() function.

  • the sync() function is used by the SDK to retrieve the current state of the model from your backend,
  • the merge() function is used by the SDK to merge state change events published by the Postgres database connector with the existing frontend state in the model.

The following diagram provides a simplified overview of the Models SDK:

A flow diagram showing how all of the livesync models components tie in together

An API key is required for your frontend app to authenticate with Ably. API keys are used either to authenticate directly with Ably using basic authentication, or to generate tokens for untrusted clients using token authentication.

API keys and tokens have a set of capabilities assigned to them that specify which operations, such as subscribe or publish can be performed on which resources. To use the Models SDK, the API key requires the following capabilities, these two capabilities are discussed further down the page, but for the time being set these two and you can review them further along:

The Models SDK requires a realtime client created using the Ably JavaScript SDK to interact with the Ably service.

Install the Ably JavaScript SDK and the Models SDK from NPM:

npm install ably @ably-labs/models
Copied!

Import the SDKs into your project:

Select...
import ModelsClient from '@ably-labs/models'; import { Realtime } from 'ably';
Copied!

Instantiate a realtime client using the Ably JavaScript SDK and pass the generated client into the Models constructor:

Select...
const ably = new Realtime({ key: '...' }); const modelsClient = new ModelsClient({ ably });
Copied!

In addition to the underlying Ably realtime client, you can provide a number of other ClientOptions to configure the default behavior of the Models SDK. Customization of these options is not a requirement, especially if using this library for the first time.

syncOptions
is used to configure how the model state is synchronised via the sync function.
historyPageSize
is the limit used when querying for paginated history used to subscribe to changes from the correct point in the channel.
messageRetentionPeriod
is the message retention period configured on the channel. This is used to determine whether the model state can be brought up to date from message history rather than via a re-sync.
retryStrategy
defines a retry strategy, an integer of milliseconds between retries, to use if calling the sync function throws an error. Defaults to -1 to stop retry attempts.
eventBufferOptions
used to configure the in-memory sliding-window buffer used for reordering and deduplication.
optimisticEventOptions
is used to configure how optimistic events are applied.
logLevel
configures the log level used to control the verbosity of log output. One of fatal, error, warn, info, debug, or trace.

The following is an example of setting ClientOptions when instantiating the Models SDK:

Select...
const modelsClient = new ModelsClient({ ably, logLevel, syncOptions: { historyPageSize, messageRetentionPeriod, retryStrategy, }, eventBufferOptions: { bufferMs, eventOrderer, }, optimisticEventOptions: { timeout, }, });
Copied!

Instantiate a model using the models.get() method on the models client. A model is identified by the channel it is subscribed to, if there already a model subscribed to the given channel then it will be returned:

Select...
const model = modelsClient.models.get({ channelName: 'posts:123', sync, merge, });
Copied!

The sync function is used by the Models SDK to fetch the latest data from your backend. The SDK will automatically call this function when it is initialized, and when the SDK detects that the latest data is no longer available on the Ably channel.

The function can contain any custom logic within it, but needs to return a promise that resolves to two values. The first is a sequenceId, which is used to identify the point in the stream of change events that corresponds to the current version of the database’s state. The second is an object representing the initial data for your data model.

The following is an example sync function that fetches the latest model state from your database by calling the /api/posts endpoint in your backend:

Select...
async function sync() { // A call to your backend webserver to retrieve the current state of a post with the ID 123 within your database. const { sequenceId, data } = await fetch('/api/posts/123') return { sequenceId, data } }
Copied!

The following is an example output from the sync function i.e. a direct result returned from your endpoint:

{ // The latest sequenceId corresponding to the current // version of the database state. "sequenceId": "1", // The initial data of the model. "data": { "id": 123, "text": "Hello World", "comments": [] } }
Copied!

The SDK automatically handles retries of the sync function in the case of an error, as specified by the syncOptions.retryStrategy defined in your ClientOptions. To configure a retryStrategy set an integer value of milliseconds for the time between retry attempts. Set this value to -1 to stop automatic retries.

Your backend should return the largest value of the sequence_id column from your outbox table. This lets the Models SDK to identify the point in the stream of change events that corresponds to the current version of the database state.

Internally, the SDK replays change events starting from the point indicated by the sequenceId to update the model state with any realtime changes occurring since the state was read from the database.

The sequenceId is automatically maintained within the outbox table. You can safely read the sequenceId and the initial state of your model within a database transaction using the read committed transaction isolation level (or greater). This guarantees a consistent view of the state across the database and Ably channel. Postgres uses read committed by default.

-- Start a database transaction BEGIN; -- Query your model state SELECT * FROM …; -- Query the latest sequenceId SELECT COALESCE(MAX(sequence_id), 0) FROM outbox; -- Commit the database transaction COMMIT;
Copied!

The merge function is used by the Models SDK to ‘merge’ updates received as messages on an Ably channel with the existing state of the model.

When a message is received on the channel, the merge function is called to merge that new message into the existing model state. The merge function is called with the existing model state and the newly received channel message, and it should return the updated model state (with the new message merged in).

The merge function must be pure and deterministic. It should not depend on any external state. The merge will often be called multiple times and in different orders with different states (for example when replaying events after rewinding to a sequenceId, or when rebasing optimistic events on top of confirmed events).

The following example is a merge function, invoked for all events received on the channel name specified in the model:

Select...
function merge(state: Post, event: OptimisticEvent | ConfirmedEvent) { if (event.name === 'addComment') { return { ...state, comments: state.comments.concat([event.data]), }; } // handle other event types }
Copied!

The merge function needs to be registered when a model is instantiated:

Select...
const model = modelsClient.models.get({ channelName: 'posts:123', sync, merge, });
Copied!

The event passed to the merge function can be either confirmed or optimistic:

Confirmed
change events received from your backend. They describe the result of a change to the data which has been committed to your database.
Optimistic
events that describe mutations that have happened locally, but have not yet been confirmed by your backend.

The Models SDK supports optimistically updating the model state. Optimistic updates will be reflected in the state immediately, and allow for a snappier user experience while the request to update the backend state is being made.

With optimistic updates enabled, the Models SDK will ‘merge’ the update into the optimistic state immediately. The SDK expects to receive an update on the model’s channel that will ‘confirm’ that optimistic update. Confirming the marks it as no longer optimistic. Optimistic updates that are not confirmed will be automatically rolled back after a timeout (optimisticEventOptions.timeout).

Use the model.optimistic() method to apply an optimistic update to your model.

  1. Call model.optimistic() on your model with the optimistic event.
  2. Apply the corresponding change to your backend.
  3. Await the confirmation of the optimistic update from the backend, or optionally cancel the optimistic update.

The model.optimistic() function returns a promise resolving to two values:

  • A confirmation promise that resolves when the backend confirms or rejects the optimistic update.
  • A function to explicitly cancel the optimistic update.

The following demonstrates an optimistic update to implement changes in the model:

Select...
// optimistically apply the changes to the model const [confirmation, cancel] = await model.optimistic({ mutationId: 'my-mutation-id', name: 'addComment', data: 'New comment!', }); try { // apply the changes in your backend await updatePost('my-mutation-id', 'New comment!'); // wait for the optimistic event to be confirmed await confirmation; } catch (err) { // something went wrong, cancel the optimistic update cancel(); }
Copied!

An optimistic update can be confirmed or rejected by your backend. This allows your backend and database to be the source of truth and your data models to automatically react to the changing data received on the Ably channel.

The SDK needs to identify when a change event received from the backend matches an optimistic event that has already been applied locally. To match confirmed and optimistic events the SDK uses a mutationId. This ID can be any string, though it’s commonly a UUID . The mutationId is generated in your frontend code and assigned to the optimistic event when you call model.optimistic. You then need to send this mutationId to your backend as part of your API calls to mutate the model. Finally, the mutationId should be included in the confirmed event that’s written to the outbox table.

If your backend decides to reject the the mutation, you should write the mutationId to the outbox with the rejected column set to true. This will cause the optimistic update to automatically rollback in the frontend client that triggered it. This rollback will be reflected in a rejected confirmation promise returned from the call to model.optimistic.

Optimistic mutations that are not confirmed or rejected within the timeout optimisticEventOptions.timeout defined in ClientOptions (default 5 seconds) will be automatically rolled back in the Model state.

BEGIN; -- mutate your data, e.g.: INSERT INTO comments (comment) VALUES ('New comment!'); -- write change event to outbox, e.g.: INSERT INTO outbox (mutation_id, channel, name, ...) VALUES ('my-mutation-id', 'posts:123', 'addComment', ...); COMMIT;
Copied!

The subscribe callback will be called with the updated model state when the state changes. This is useful for updating elements in your application on change, for example the UI.

The following example subscribes to model state changes:

Select...
model.subscribe((err, post) => { /* model state updated! */ });
Copied!

Subscriptions have optimistic updates enabled by default. The callback is triggered whenever the optimistic or confirmed state changes.

To disable optimistic updates and rely only on confirmed updates, set the optimistic option to false:

Select...
model.subscribe((err, post) => { /* ... */ }, { optimistic: false });
Copied!

A model instance can be in one of the following lifecycle states:

initialized
the model has been initialized but has not yet attached to the underlying channel.
syncing
the model is synchronizing its state from the backend.
ready
the model is attached to the channel and processing realtime updates.
paused
the user has paused the model.
errored
the model has errored processing data from the sync, or from the stream.
disposed
the model has been disposed, either by the user disposing it or an unrecoverable error.

Listen for model state change events on a model instance:

Select...
model.on('paused', () => { /* model paused*/ }); model.on('ready', () => { /* model resumed */ }); model.on('disposed', () => { /* model disposed */ });
Copied!

You can pause a model to prevent the model from consuming realtime updates, while reserving the ability to resume it at some point in the future. Pausing can be useful in instances such as when the UI rendering the model data is temporarily out-of-view.

The following pauses the model. New events will not be processed and subscription callbacks will not be invoked:

Select...
await model.pause();
Copied!

The following example resumes the model. Event processing will resume and changes will be made available to subscribers

Select...
await model.resume();
Copied!

When a model is no longer needed, you can dispose it to free up resources:

Select...
await model.dispose();
Copied!

The SDK will initialise model state using the sync function, and capture the sequenceId you return from your sync function. The SDK then connects to the model’s channel, and will rewind backwards though history until it finds the correct point in the stream of channel messages (based on the sequenceId). You can configure how many messages are requested in each page of history when rewinding using the parameter syncOptions.historyPageSize defined in your ClientOptions.

After the SDK executes the sync function, it internally connects to the channel and begins subscribing to live, yet unprocessed, messages. It then paginates backward through the channel’s message history, starting from the attachment point, using untilAttach. The number of items on each page can be configured via the syncOptions.historyPageSize defined in your ClientOptions.

When a message is published to the channel with a new sequenceId, the model processes subsequent messages to maintain continuity with live messages. If the target sequenceId is unreachable, there is insufficient message history to resume from the correct point to update the model. The SDK will attempt a re-sync. It re-syncs by calling the sync function again to acquire a newer version of the model state.

The amount of history available to query on the channel is determined by your message storage configuration on the channel. This configuration must match the syncOptions.messageRetentionPeriod defined in your ClientOptions. The SDK uses this configuration option as a hint as to whether to resynchronize via the sync function and skip paginating through history when messages are expected to have expired.

To ensure the model’s state is up to date, cache the state obtained from the backend endpoint used by the sync function. The model state returned from this endpoint must not exceed the message retention period set on the channel. If no previous messages exist on the channel, the model is assumed to be a new state without any modifications.

Change events from your backend are delivered to the SDK as messages over the channels. By default, messages are processed by the SDK in the order in which they are received.

The SDK supports a short time buffering for change events, facilitating short-term reordering and de-duplication. By default, this feature is not enabled. To activate it, set the eventBufferOptions.bufferMS in the ClientOption to a non-zero value.

When using the event buffer, change events are de-duplicated and ordered according to their message.id, which corresponds to the change event’s sequenceId.

By default, the events in the buffer are ordered numerically if the message.id can be coerced to a number. Otherwise, events will be ordered lexicographically by their message.id. You can specify a custom ordering based on any part of the message via the eventBufferOptions.eventOrderer ClientOption.

In general, when designing your channels and models, aim for a 1:1:1 relationship between channels, models, and your domain model objects. Channels facilitate data partitioning and access control. For instance, in a task management app, each task can be represented by a model with its own channel, ensuring updates are specific to each task. Ably’s capabilities are applied on a per channel basis allowing fine-grained access control for each data model in your application.

For infrequently updated models, grouping them on the same channel may be viable, while frequently changing models benefit from individual channels. Starting with one channel per model and one model per domain object is a good baseline, adjusting as needed.

An example of a website where people can make posts, others can comment to it or react to the post, on the right is an explanation of where the livesync data models are used within this post.

When transmitting data over Ably, it is essential to convey only the changes made, known as “deltas.” This approach ensures easier synchronization across all clients, as each can simply update their existing model with the new state rather than recalculating it. Initially, you may opt to include the entire model state in each message, but as the model grows, this may exceed message size limits. Therefore, using deltas is recommended, enabling frontend applications to react to specific changes efficiently.

For example, in a weather application, updates to temperature might include the new value and its corresponding location:

{"temperature": 25, "city": "London"}
Copied!

Alternatively, standardized schemas like JSONPatch:

[ { "op": "replace", "path": "/weather/London/temperature", "value": 25 } ]
Copied!

While JSONPatch requires more effort to calculate delta events, it simplifies the merge function code, as patches can be applied automatically using libraries like fast-json-patch.

An API reference is available that contains a full list of calls, parameters and responses.

How it works
v2.0