Replicate data to Materialize
Learn how to replicate data from Neon to Materialize
Neon's logical replication feature allows you to replicate data from your Neon Postgres database to external destinations.
Materialize is a data warehouse for operational workloads, purpose-built for low-latency applications. You can use it to process data at speeds and scales not possible in traditional databases, but without the cost, complexity, or development time of most streaming engines.
In this guide, you will learn how to stream data from your Neon Postgres database to Materialize using the Materialize PostgreSQL source.
Prerequisites
- A Materialize account.
- A Neon account.
- Optionally, you can install the psql command line utility for running commands in both Neon and Materialize. Alternatively, you can run commands from the Neon SQL Editor and Materialize SQL Shell, which require no installation or setup.
- Read the important notices about logical replication in Neon before you begin.
Enable logical replication
important
Enabling logical replication modifies the PostgreSQL wal_level
configuration parameter, changing it from replica
to logical
for all databases in your Neon project. Once the wal_level
setting is changed to logical
, it cannot be reverted. Enabling logical replication also restarts all computes in your Neon project, meaning that active connections will be dropped and have to reconnect.
To enable logical replication in Neon:
- Select your project in the Neon Console.
- On the Neon Dashboard, select Settings.
- Select Logical Replication.
- Click Enable to enable logical replication.
You can verify that logical replication is enabled by running the following query:
Create a publication
After logical replication is enabled in Neon, the next step is to create a publication for the tables that you want to replicate to Materialize.
-
From a
psql
client connected to your Neon database or from the Neon SQL Editor, set the replica identity toFULL
for each table that you want to replicate to Materialize:REPLICA IDENTITY FULL
ensures that the replication stream includes the previous data of changed rows, in the case ofUPDATE
andDELETE
operations. This setting allows Materialize to ingest Postgres data with minimal in-memory state. -
Create a publication with the tables you want to replicate:
For specific tables:
The
mz_source
publication will contain the set of change events generated from the specified tables and will later be used to ingest the replication stream.Be sure to include only the tables you need. If the publication includes additional tables, Materialize wastes resources on ingesting and then immediately discarding the data from those tables.
Create a Postgres role for replication
It is recommended that you create a dedicated Postgres role for replicating data. The role must have the REPLICATION
privilege. The default Postgres role created with your Neon project and roles created using the Neon CLI, Console, or API are granted membership in the neon_superuser role, which has the required REPLICATION
privilege.
Grant schema access to your Postgres role
If your replication role does not own the schemas and tables you are replicating from, make sure to grant access. For example, the following commands grant access to all tables in the public
schema to Postgres role replication_user
:
GRANT USAGE ON SCHEMA public TO replication_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replication_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO replication_user;
Granting SELECT ON ALL TABLES IN SCHEMA
instead of naming the specific tables avoids having to add privileges later if you add tables to your publication.
Allow inbound traffic
If you use Neon's IP Allow feature to limit IP addresses that can connect to Neon, you will need to allow inbound traffic from Materize IP addresses. If you are currently not limiting IP address access in Neon, you can skip this step.
-
From a
psql
client connected to Materialize or from the Materialize SQL Shell, run this command to find the static egress IP addresses for the Materialize region you are running in:SELECT * FROM mz_egress_ips;
-
In your Neon project, add the IPs to your IP Allow list, which you can find in your project's settings. For instructions, see Configure IP Allow.
Create an ingestion cluster
In Materialize, a cluster is an isolated environment, similar to a virtual warehouse in Snowflake. When you create a cluster, you choose the size of its compute resource allocation based on the work you need the cluster to do, whether ingesting data from a source, computing always-up-to-date query results, serving results to clients, or a combination.
In this case, you’ll create 1 new cluster containing 1 medium replica for ingesting source data from your Neon Postgres database.
From a psql
client connected to Materialize or from the Materialize SQL Shell, run the CREATE CLUSTER
command to create the new cluster:
CREATE CLUSTER ingest_postgres SIZE = 'medium';
Materialize recommends starting with a medium size replica or larger. This helps Materialize quickly process the initial snapshot of the tables in your publication. Once the snapshot is finished, you can right-size the cluster.
Start ingesting data
Now that you’ve configured your database network and created an ingestion cluster, you can connect Materialize to your Neon Postgres database and start ingesting data.
-
From a
psql
client connected to Materialize or from the Materialize SQL Shell, use the CREATE SECRET command to securely store the password for the Postgres role you created earlier:CREATE SECRET pgpass AS '<PASSWORD>';
You can access the password for your Neon Postgres role from the Connection Details widget on the Neon Dashboard.
-
Use the CREATE CONNECTION command to create a connection object with access and authentication details for Materialize to use:
CREATE CONNECTION pg_connection TO POSTGRES ( HOST '<host>', PORT 5432, USER '<role_name>', PASSWORD SECRET pgpass, SSL MODE 'require', DATABASE '<database>' );
You can find the connection details for your replication role in the Connection Details widget on the Neon Dashboard. A Neon connection string looks like this:
postgresql://alex:AbC123dEf@ep-cool-darkness-123456.us-east-2.aws.neon.tech/dbname?sslmode=require
- Replace
<host>
with your Neon hostname (e.g.,ep-cool-darkness-123456.us-east-2.aws.neon.tech
) - Replace
<role_name>
with the name of your Postgres role (e.g.,alex
) - Replace
<database>
with the name of the database containing the tables you want to replicate to Materialize (e.g.,dbname
)
- Replace
-
Use the CREATE SOURCE command to connect Materialize to your Neon Postgres database and start ingesting data from the publication you created earlier:
CREATE SOURCE mz_source IN CLUSTER ingest_postgres FROM POSTGRES CONNECTION pg_connection (PUBLICATION 'mz_source') FOR ALL TABLES;
Tips
- To ingest data from specific schemas or tables in your publication, you can use
FOR SCHEMAS (<schema1>,<schema2>)
orFOR TABLES (<table1>, <table2>)
instead ofFOR ALL TABLES
. - After creating a source, you can incorporate upstream schema changes for specific replicated tables using the
ALTER SOURCE...{ADD | DROP} SUBSOURCE
syntax.
- To ingest data from specific schemas or tables in your publication, you can use
Check the ingestion status
Before Materialize starts consuming a replication stream, it takes a snapshot of the tables in your publication. Until this snapshot is complete, Materialize won’t have the same view of your data as your Postgres database.
In this step, you’ll verify that the source is running and then check the status of the snapshotting process.
-
From a
psql
client connected to Materialize or from the Materialize SQL Shell, use the mz_source_statuses table to check the overall status of your source:WITH source_ids AS (SELECT id FROM mz_sources WHERE name = 'mz_source') SELECT * FROM mz_internal.mz_source_statuses JOIN ( SELECT referenced_object_id FROM mz_internal.mz_object_dependencies WHERE object_id IN (SELECT id FROM source_ids) UNION SELECT id FROM source_ids ) AS sources ON mz_source_statuses.id = sources.referenced_object_id;
For each subsource, make sure the status is running. If you see stalled or failed, there’s likely a configuration issue for you to fix. Check the error field for details and fix the issue before moving on. If the status of any subsource is starting for more than a few minutes, contact Materialize support.
-
Once the source is running, use the mz_source_statistics table to check the status of the initial snapshot:
WITH source_ids AS (SELECT id FROM mz_sources WHERE name = 'mz_source') SELECT sources.object_id, bool_and(snapshot_committed) AS snapshot_committed FROM mz_internal.mz_source_statistics JOIN ( SELECT object_id, referenced_object_id FROM mz_internal.mz_object_dependencies WHERE object_id IN (SELECT id FROM source_ids) UNION SELECT id, id FROM source_ids ) AS sources ON mz_source_statistics.id = sources.referenced_object_id GROUP BY sources.object_id; object_id | snapshot_committed ----------|------------------ u144 | t (1 row)
Once
snapshot_commited
ist
, move on to the next step. Snapshotting can take between a few minutes to several hours, depending on the size of your dataset and the size of the cluster replica you chose for youringest_postgres
cluster.
Right-size the cluster
After the snapshotting phase, Materialize starts ingesting change events from the Postgres replication stream. For this work, Materialize generally performs well with an xsmall
replica, so you can resize the cluster accordingly.
-
From a
psql
client connected to Materialize or from the Materialize SQL Shell, use the ALTER CLUSTER command to downsize the cluster toxsmall
:ALTER CLUSTER ingest_postgres SET (SIZE 'xsmall');
Behind the scenes, this command adds a new
xsmall
replica and removes themedium
replica. -
Use the SHOW CLUSTER REPLICAS command to check the status of the new replica:
SHOW CLUSTER REPLICAS WHERE cluster = 'ingest_postgres'; cluster | replica | size | ready -----------------+---------+--------+------- ingest_postgres | r1 | xsmall | t (1 row)
-
Going forward, you can verify that your new replica size is sufficient as follows:
a. From a
psql
client connected to Materialize or from the Materialize SQL Shell, get the replication slot name associated with your Postgres source from the mz_internal.mz_postgres_sources table:SELECT d.name AS database_name, n.name AS schema_name, s.name AS source_name, pgs.replication_slot FROM mz_sources AS s JOIN mz_internal.mz_postgres_sources AS pgs ON s.id = pgs.id JOIN mz_schemas AS n ON n.id = s.schema_id JOIN mz_databases AS d ON d.id = n.database_id;
b. From a
psql
client connected to your Neon database or from the Neon SQL Editor, check the replication slot lag, using the replication slot name from the previous step:SELECT pg_size_pretty(pg_current_wal_lsn() - confirmed_flush_lsn) AS replication_lag_bytes FROM pg_replication_slots WHERE slot_name = '<slot_name>';
The result of this query is the amount of data your Postgres cluster must retain in its replication log because of this replication slot. Typically, this means Materialize has not yet communicated back to your Neon Postgres database that it has committed this data. A high value can indicate that the source has fallen behind and that you might need to scale up your ingestion cluster.
Next steps
With Materialize ingesting your Postgres data into durable storage, you can start exploring the data, computing real-time results that stay up-to-date as new data arrives, and serving results efficiently.
- Explore your data with SHOW SOURCES and SELECT.
- Compute real-time results in memory with CREATE VIEW and CREATE INDEX or in durable storage with CREATE MATERIALIZED VIEW.
- Serve results to a Postgres-compatible SQL client or driver with SELECT or SUBSCRIBE or to an external message broker with CREATE SINK.
- Check out the tools and integrations supported by Materialize.
Need help?
Join our Discord Server to ask questions or see what others are doing with Neon. Users on paid plans can open a support ticket from the console. For more detail, see Getting Support.