Postgres database connector
Use the Postgres database connector to distribute changes from your Postgres database to end users at scale. It enables you to distribute records using the outbox pattern to large numbers of subscribing clients, in realtime, as the changes occur.
How it works
The Postgres database connector can be enabled through an integration rule, and consumes outbox table records from your Postgres database and publishes them to channels in Ably.
The integration rule exists as a “database connector” component that is entirely provisioned and manged by Ably. The database connector is fault tolerant by default and integration rules automatically fail-over to a new worker node in case of failures.
When your backend server processes a request to update data in your database, it should also write a corresponding change event to the outbox table within the same transaction as the in-flight request. The Postgres database connector listens for changes in the oubtox table and publishes a message to Ably. Messages will retain the order they were written in. The Postgres database connector automatically deletes records from the table once the records are successfully published.
By using the outbox table, you can specify which channel that record should be published to, and make changes to your other database tables transactionally with the publish. This mitigates the problem of trying to transactionally write to two different systems; Ably and the database. You can gain exactly-once and in-order delivery over Ably transactionally with other data you modify in your database.
The Postgres database connector automatically retries failed publishes while maintaining ordering of messages on each channel.
The database connector can be self-hosted.
Integration rule
Creating the rule
There are two ways to create a Postgres integration rule:
- Using the Ably Dashboard.
- Using the Control API.
To create a rule in your dashboard
- Login and select the application you wish to use
- Click the Integrations tab.
- Click the New Integration Rule button.
- Choose Postgres from the list.
Rule configuration
Use the following fields to configure your Postgres rule:
Option | Description |
---|---|
URL | The URL for your Postgres database, for example postgres://user:[email protected]:5432/your-database-name . The associated user must have the correct privileges |
Outbox table schema | Schema for the outbox table in your database which allows for the reliable publication of an ordered sequence of change event messages over Ably. |
Outbox table name | Name for the outbox table. |
Nodes table schema | Schema for the nodes table in your database to allow for operation as a cluster to provide fault tolerance. |
Nodes table name | Name for the nodes table. |
SSL mode | Determines the level of protection provided by the SSL connection. Options are: prefer , require , verify-ca , verify-full ; default value is prefer . |
SSL root certificate | Optional. Specifies the SSL certificate authority (CA) certificates. Required if SSL mode is verify-ca or verify-full . |
Primary site | The primary data center in which to run the integration rule. |
You can optionally test your Database Connector is correctly configured using the Curl requests provided in the integration rule of your application in the Ably dashboard.
Database setup
The Postgres database connector relies on the outbox table, nodes table, and trigger function to exist in your database. Use the following to create them:
CREATE TABLE IF NOT EXISTS nodes (
id TEXT PRIMARY KEY,
expiry TIMESTAMP WITHOUT TIME ZONE NOT NULL
);
CREATE TABLE IF NOT EXISTS outbox (
sequence_id serial PRIMARY KEY,
mutation_id TEXT NOT NULL,
channel TEXT NOT NULL,
name TEXT NOT NULL,
rejected boolean NOT NULL DEFAULT false,
data JSONB,
headers JSONB,
locked_by TEXT,
lock_expiry TIMESTAMP WITHOUT TIME ZONE,
processed BOOLEAN NOT NULL DEFAULT false
);
CREATE OR REPLACE FUNCTION public.outbox_notify()
RETURNS trigger AS $$
BEGIN
PERFORM pg_notify('ably_adbc'::text, ''::text);
RETURN NULL;
EXCEPTION
-- ensure this function can never throw an uncaught exception
WHEN others THEN
RAISE WARNING 'unexpected error in %s: %%', SQLERRM;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER public_outbox_trigger
AFTER INSERT ON public.outbox
FOR EACH STATEMENT EXECUTE PROCEDURE public.outbox_notify();
CopyCopied!
You can modify the table names and schemas to match your database design. Use the table names when configuring the rule.
Tables
The Postgres database connector makes use of two tables in your database that you will need to create, an outbox table and a nodes table.
Outbox table
The Postgres database connector automatically listens for new records written to the outbox table and publishes them as messages over channels.
The following describes the key columns in the outbox table:
channel
: the name of the channel the outbox record will be published to.name
: the name given to the message.data
: the data carried in the message.mutation_id
is the identifier for the mutation. Required for use with frontend data models and optimistic updates.
The following table describes the full schema of the outbox table in your database:
Field | Type | Required on write? | Example Value | Description |
---|---|---|---|---|
sequence_id | Serial integer (primary key) | No (automatically assigned) | 1 | Monotonically increasing identifier that determines publish order within the scope of the channel. |
mutation_id | String | Yes | 680f3f78-21ec-49a0-be99-25f89a84f232 | The ID of the mutation, used for correlating the outbox event with an optimistic event applied locally on the client. |
channel | String | Yes | documents | The channel name on which to publish this change event. |
name | String | Yes | edit | The message event name to use when publishing the message. |
rejected | Boolean | No (defaults to FALSE) | FALSE | True if the event rejects a client side change, false to confirm the change. Defaults to false (confirming the change). |
data | JSON | Yes | { "read": false, "data": { "timestamp": 1674744488658, "body": "Lorem ipsum" } } | The message payload to use when publishing the message. |
headers | JSON | No (optional) | { "id": 123, "type": "document", "author": "socrates", "pages": [1, 5, 7] } | A set of message attributes, provided under the headers key in the message extras. These are optional properties that may contain metadata and/or ancillary payloads. |
locked_by | String | No (implementation detail) | 0d6c0277-e88a-4dba-a854-e80a4bd75317 | The ID of the node that has locked this record. This is an implementation detail of how the Database Connector processes records and you should not set a value for this column when inserting an outbox record. |
lock_expiry | Timestamp | No (implementation detail) | 2023-06-12 16:24:27 | The timestamp at which the lock will expire. This is an implementation detail of how the Database Connector processes records and you should not set a value for this column when inserting an outbox record. |
Change detection
The Postgres database connector uses a poll-on-change strategy to query for new records to process when they are inserted into the outbox. This is achieved through the use of a trigger configured on the outbox table. The trigger invokes a function which uses NOTIFY to broadcast a notification to the Postgres database connector, which it receives using LISTEN.
Internally, the Database Connector debounces notifications within a time window. This approach avoids imposing additional load on the database due to polling when there are no new records to process.
Nodes table
The Postgres database connector operates as a cluster to provide fault tolerance. Work is automatically shared across available nodes.
To partition outbox records and process them across available nodes, each node must know about all other available nodes in the cluster. These nodes discover each other through the nodes database table.
The nodes table must exist in your database, but is purely for the operation of the connector. You do not need to interact with this table directly.
The following table describes the schema of the nodes table in your database:
Field | Type | Description |
---|---|---|
id | Text (primary key) | UUID for the node in the table. |
expiry | Timestamp | Timestamp after which the node is considered no longer active. |
Node discovery
Nodes discover one another using a nodes table that contains a row for each node in the cluster.
When a node starts up, it generates a unique ID and is added to the nodes table. When the node shuts down, it removes itself from the table.
Each node in the table includes an expiry timestamp, set to a time in the future. Periodically, each node sends a heartbeat to the database by updating its expiry timestamp in the nodes table. It will eventually expire if a node cannot communicate with the database. The Postgres database connector automatically removes expired nodes from the table.
Privileges
The Postgres database connector only needs to interact with the outbox and nodes tables and does not rely on any other state in the database. This simplifies the security model as the connector can be given a reduced set of permissions.
The PostgreSQL user used by the connector requires SELECT
and DELETE
privileges on the outbox
and nodes
tables.
The following is an example of creating a role with the necessary privileges against each table:
CREATE ROLE YOUR_USER LOGIN PASSWORD 'your_database' VALID UNTIL 'infinity';
GRANT CONNECT ON DATABASE your_database TO your_user;
GRANT SELECT, UPDATE, INSERT, DELETE ON public.outbox TO your_user;
GRANT SELECT, UPDATE, INSERT, DELETE ON public.nodes TO your_user;
GRANT USAGE, SELECT ON outbox_sequence_id_seq TO your_user;
CopyCopied!
Frontend data models
The Postgres database connector and outbox table are designed to make it easy to use directly, but also easy to use with the Models SDK. The SDK provides support for optimistic updates, and makes it easier to load the initial state into your application before streaming updates directly from your database.
Self hosted
The Postgres database connector can also be self hosted. See the project README.