> For the complete documentation index, see [llms.txt](https://upsolver.gitbook.io/content/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://upsolver.gitbook.io/content/how-to-guides-1/jobs/database-replication/replicate-cdc-data-to-multiple-targets-in-snowflake.md).

# Replicate CDC Data to Multiple Targets in Snowflake

{% hint style="success" %}
This guide uses **PostgreSQL** as the example source for the CDC data. However, the principles are the same for **Microsoft SQL Server**, **MongoDB**, and **MySQL**.
{% endhint %}

## One script, multiple targets

Replication jobs enable you to copy data from your source database and replicate it to multiple targets, reducing the need to write a job for each schema into which you want to write your data.&#x20;

These special jobs use **replication groups**, so you can define the source and ingest the data once, and then create a replication group with its own requirements for each target. Although they share the same publication, each group operates independently: define a replication job that includes multiple targets, and Upsolver automatically creates a sub-job for each target.  &#x20;

In this guide, you will discover how to create a replication job that writes to two different schemas in Snowflake: one that is used for production reporting, and the other for development. Each group will share the source data, but write to the target at different intervals and perform different manipulations on the data.&#x20;

## Monitor your jobs

Furthermore, you will learn how to use the system tables to monitor your new jobs to ensure data is streaming as anticipated. An important component of job creation and administration, system tables expose metadata, enabling you to monitor performance and handle issues.  &#x20;

## Prerequisites

### Create a publication

Upsolver requires the publication of replicated tables in order to be able to subscribe to change events. The publication can be either on all tables or on specific tables. A publication of all tables can be created by a superuser using the following command:

```sql
CREATE PUBLICATION upsolver FOR ALL TABLES
```

Creating a publication on specific tables will not automatically include newly created tables. For instructions on how to create and manage publications for specific tables, see the [PostgreSQL documentation](https://www.postgresql.org/docs/current/sql-alterpublication.html).

### Create a Heartbeat table

Follow these steps to create a heartbeat table:

1. Create a heartbeat table:

```sql
CREATE TABLE IF NOT EXISTS <schema_name>.<table_name> 
  (key int primary key, value timestamp)
```

2\. Grant the necessary permissions for Upsolver to use the heartbeat table:

```sql
GRANT INSERT, UPDATE, SELECT on table <table_name> to <user>
```

3\.  If the publication used in the data source was created for specific tables (not all tables), add the heartbeat table to the publication. See the [PostgreSQL documentation](https://www.postgresql.org/docs/current/sql-alterpublication.html).

4\. Add the heartbeat table to the data source in the data source creation wizard or in the data source properties.

{% hint style="info" %}
Follow the [Enable CDC](/content/how-to-guides-1/connectors/enable-cdc.md) guide for configuring your database for replication.
{% endhint %}

The steps for creating the job for your replication and performing ongoing monitoring are as follows:

1. Connect to PostgreSQL
2. Connect to Snowflake
3. Create a replication job to ingest data to multiple targets
4. View the snapshot status
5. Monitor your jobs using system tables

## Step 1

### Connect to PostgreSQL

Having created your publication in PostgreSQL, the first step is to connect to the database from which you want to ingest your CDC data. You will need the connection string to your PostgreSQL database, and your username and password. &#x20;

**Here's the code to connect to PostgreSQL:**

```sql
-- Connect to your source PostgreSQL database
CREATE POSTGRES CONNECTION my_postgres_connection
    CONNECTION_STRING = 
       'jdbc:postgresql://cdc2.c02eyuedjkh9.eu-west-1.rds.amazonaws.com:5432/Sales'
    USER_NAME = 'your username'
    PASSWORD = 'your password'
```

## Step 2

### Connect to Snowflake

Next, create a connection to your target database. You will need your Snowflake connection string, and your username and password.

**Here's the code to connect to Snowflake:**

```sql
-- Create a connection to your target Snowflake database
CREATE SNOWFLAKE CONNECTION my_snowflake_connection
    CONNECTION_STRING = 
       'jdbc:snowflake://snowflakedemo.us-east-1.snowflakecomputing.com?
       db=REPORTING_DB&warehouse=REPORTING_WH&role=ADMIN'
    USER_NAME = 'your username'
    PASSWORD = 'your password';
```

## Step 3

### Create a replication job to ingest data to multiple targets

Ensure you have created the target schemas in your Snowflake database that you will be using in your replication job. This example uses the **ORDERS\_PROD** and **ORDERS\_DEV** schemas as targets.&#x20;

The next step is to create a replication job that copies the CDC data from the **Sales** database in PostgreSQL to the **ORDERS\_PROD** and **ORDERS\_DEV** schemas in the **REPORTING\_DB** database in Snowflake.&#x20;

**Here's the code to create the replication job:**

```sql
CREATE REPLICATION JOB postgres_replication_to_snowflake
  COMMENT = 'Replicate Postgres CDC data to Snowflake groups'
  COMPUTE_CLUSTER = "Default Compute (Free)"
  INTERMEDIATE_STORAGE_CONNECTION = s3_connection
  INTERMEDIATE_STORAGE_LOCATION = 's3://upsolver-integration-tests/test/'   
FROM my_postgres_connection 
  PUBLICATION_NAME = 'sales_publication' 
  HEARTBEAT_TABLE = 'sales.heartbeat'
WITH REPLICATION GROUP replicate_to_snowflake_prod 
  -- include all tables in the orders and web_orders schemas
  INCLUDED_TABLES_REGEX = ('(orders|web_orders)\..*') 
  EXCLUDED_COLUMNS_REGEX = ('.*\.address') -- exclude address columns 
  LOGICAL_DELETE_COLUMN = "is_deleted"
  REPLICATION_TARGET = my_snowflake_connection
  TARGET_SCHEMA_NAME_EXPRESSION = 'ORDERS_PROD' 
  TARGET_TABLE_NAME_EXPRESSION = $table_name  
  WRITE_INTERVAL = 5 MINUTES
  WRITE_MODE = MERGE 
WITH REPLICATION GROUP replicate_to_snowflake_dev 
  -- include all tables in the orders schema
  INCLUDED_TABLES_REGEX = ('(orders|web_orders)\..*')  
  REPLICATION_TARGET = my_snowflake_connection
  TARGET_SCHEMA_NAME_EXPRESSION = 'ORDERS_DEV' 
  TARGET_TABLE_NAME_EXPRESSION = 'dev_' || $table_name    
  WRITE_INTERVAL = 4 HOURS
  WRITE_MODE = APPEND;
```

Let's understand what this job does.

If you have previously created ingestion and transformation jobs in Upsolver, you may notice that this code uses a job type of **REPLICATION**: this keyword instructs Upsolver to create a sub-job for each replication group.&#x20;

In this example, we create a replication job named **postgres\_replication\_to\_snowflake**, with an optional `COMMENT` to describe what this job does. The `COMPUTE_CLUSTER` specifies which cluster should run the job if you have more than one cluster, otherwise, the default cluster is used.

The job includes two source options: the `PUBLICATION_NAME` option defines the publication to use, which should already be created on your PostgreSQL database. In this example, we use a publication named **sales\_publication**. Furthermore, the `HEARTBEAT_TABLE` option specifies the heartbeat table to use, in this case, **sales.heartbeat**. The options specified here are shared by both replication groups.

Upsolver will create a sub-job for each replication group, enabling each group to have a different job configuration. The **ORDERS\_PROD** schema requires fresh data, so the `WRITE_INTERVAL` has been set to **5 MINUTES**, whereas the **ORDERS\_DEV** schema is not critical, so the data is updated every **4 HOURS**. &#x20;

Furthermore, the production group will `MERGE` the data. By default, Upsolver permanently deletes rows from the target. To logically delete the row instead, the **is\_deleted** column is added. The development group will `APPEND` the data.&#x20;

Both groups replicate all tables in the **orders** and **web\_orders** schemas, as specified in the `INCLUDED_TABLES_REGEX` option. However, all **address** columns are excluded from the production target in the `EXCLUDED_COLUMNS_REGEX` option to remove personally identifiable information (PII).

In the development group, the `TARGET_SCHEMA_NAME_EXPRESSION` option includes the prefix value of **dev\_**. This means that Upsolver will create the target table using the name of the source table with the prefix so it is clear that the target tables in this group are used for development.

If you wanted to extend this code, you could add a third replication group for a UAT schema, for example.&#x20;

## Step 4

### View the snapshot status

When you create your job, Upsolver immediately begins snapshotting the tables in your source database. You can monitor the snapshotting progress and observe the number of rows ingested as follows:&#x20;

1. After executing the `CREATE REPLICATION JOB` statement, click **Jobs** from the main menu in Upsolver, and you will see your new job listed. If you have many existing jobs, you can click **Filters** to search for your job, or click **Date Created** to order by most recent and see your new job. In this example, Upsolver creates the job **postgres\_replication\_to\_snowflake**. This is the master job for the replication, and the **From**-**To** icon confirms that data streams from PostgreSQL to Snowflake.&#x20;
2. Click on the job name to open the **Jobs** page. Notice that Upsolver has created a sub-job - each with the name format **\<job name>** **-** **\<replication group name>** **output job** - for each replication group created in the job. There is also a **copy job** that performs the replication:
   * **postgres\_replication\_to\_snowflake copy job**
   * **postgres\_replication\_to\_snowflake - replicate\_to\_snowflake\_prod output job**
   * **postgres\_replication\_to\_snowflake - replicate\_to\_snowflake\_dev output job**
3. Click on the **copy job** to open the job status page. This job ingests the CDC data from PostgreSQL into a staging table in the Upsolver lake. Here you can monitor progress as Upsolver takes a snapshot of each table that was included in the job. The status of each table shows as **Pending**, **Snapshotting**, or **Streaming**. It may take a few minutes for this page to update when you first open it while Upsolver collects the monitoring information.&#x20;
4. Located under the tables group are the job metrics showing the **Summary** view as default. This displays **Rows Pending Processing** and **Rows written (completed executions)** to provide insight into the data ingestion process. Use the display toggle to change this view from **Summary** to **All** if you want to see all metrics. Click a metric to expand the information. You can then copy and paste the code from the **See All Events** section into a worksheet and run this at any time without needing to open up the job page. &#x20;
5. Click the **Job Details** button. This opens the **Information** screen to display the **SQL** syntax that created the job, the job **Options** and **Details**, and the **Snapshot Status** for each CDC table.&#x20;
6. Click the close icon to return to the job status page, then click **Back** to return to the **Jobs** page.
7. You can then click into each of the sub-jobs and explore the job metrics.&#x20;

## Step 5

### Monitor your jobs using system tables

The system tables provide information about your jobs to enable you to monitor progress, and diagnose issues. To view information about your replication job, query the **jobs** system table.

**Here's the code:**

```sql
SELECT * 
FROM system.information_schema.jobs
WHERE name LIKE 'postgres_replication_to_snowflake%';
```

This returns information about the master job, the replication (copy job), and each sub-job created for each replication group. The **job\_type** column affirms the action of each job:&#x20;

* **replication**: the master job&#x20;
* **copy**: the copy job that performs the CDC replication into the staging table
* **merge:** the `WRITE_MODE` option for the replication group is **MERGE**
* **insert**: the `WRITE_MODE` option for the replication group is **APPEND**

The **cdc\_status** system table lists each table in your replication copy job and includes the status, e.g. **Streaming**. If you included a heartbeat table in your job, this would also be listed. &#x20;

**Here's the code to view the CDC status of each table:**

```sql
SELECT * 
FROM system.monitoring.cdc_status 
WHERE job_name LIKE 'postgres_replication_to_snowflake%';
```

***

## Conclusion

In this guide you learned how to replicate your change data capture tables from PostgreSQL into multiple target schemas in your Snowflake database. You saw how easy it is to create a single job with multiple replication groups that support different options in the target schema. Then you learned how to view the snapshotting process and ensure your CDC data was replicating as expected, and you queried the system tables to perform ongoing job monitoring.

## Try it yourself

To ingest your CDC data from PostgreSQL into multiple targets in Snowflake:

1. Connect to your CDC-enabled PostgreSQL database
2. Connect to your target Snowflake database
3. Create a replication job with a replication group for each target schema
4. Monitor the snapshotting process and observe data ingestion
5. Monitor your jobs using the job status metrics

***

{% hint style="success" %}
**Learn More**

For more information and examples for your database platform. please refer to the Replication job command reference.
{% endhint %}


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter, and the optional `goal` query parameter:

```
GET https://upsolver.gitbook.io/content/how-to-guides-1/jobs/database-replication/replicate-cdc-data-to-multiple-targets-in-snowflake.md?ask=<question>&goal=<endgoal>
```

`ask` is the immediate question: it should be specific, self-contained, and written in natural language.
`goal` is optional and describes the broader end goal you are ultimately trying to accomplish on behalf of the user. GitBook uses it to tailor the answer towards what is most useful for that goal.

The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
