> For the complete documentation index, see [llms.txt](https://upsolver.gitbook.io/content/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://upsolver.gitbook.io/content/reference-1/sql-commands/jobs/create-job/replication.md).

# Replication

Replication jobs copy change data capture (CDC) data from enabled databases into one or more target schemas in the following supported targets:

* Amazon Redshift
* AWS Glue Data Catalog
* Polaris Catalog
* Snowflake

The source data is ingested into a staging table that each replication group then reads from to update the target. While sharing the source data, replication groups can be individually configured using different intervals and options to write to a separate schema. Therefore you create one entity and Upsolver automatically generates a job for each replication group, managing the freshness of data in each target based on the commit interval that you specify.&#x20;

{% hint style="info" %}
Please note:

* Target tables in **Snowflake** are created in **upper case**.
* Target tables in **Amazon Redshift** are created in **lower case**.
  {% endhint %}

## Syntax

```sql
CREATE REPLICATION JOB <job_identifier>
    [{ job_options }]  
FROM <source_connection_name> 
    [{ source_options }]
WITH REPLICATION GROUP <group_name>
    [{ replication_group_options }]
[ WITH REPLICATION GROUP ... ];
```

### Job identifier

This is the name of your job, which should match the following format:

```sql
identifier = "([^"]|"")*"|[A-Za-z_][A-Za-z0-9_]*;
```

***

## Job options

The following job options can be applied to all sources.:

```sql
[ COMMENT = '<comment>' ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]
[ INTERMEDIATE_STORAGE_CONNECTION ]
[ INTERMEDIATE_STORAGE_LOCATION ]
[ SNAPSHOT_PARALLELISM = <integer> ]
```

**Jump to**

* [`COMMENT`](#comment-editable)
* [`COMPUTE_CLUSTER`](#compute_cluster-editable)
* [`INTERMEDIATE_STORAGE_CONNECTION`](#intermediate_storage_connection)
* [`INTERMEDIATE_STORAGE_LOCATION`](#intermediate_storage_location)
* [`SNAPSHOT_PARALLELISM`](#snapshot_parallelism)

#### `COMMENT` — editable

Type: `text`

(Optional) A description or comment regarding this job.

#### `COMPUTE_CLUSTER` — editable

Type: `identifier`

Default: The sole cluster in your environment

(Optional) The compute cluster to run this job.

This option can only be omitted when there is only one cluster in your environment.

If you have more than one compute cluster, you need to determine which one to use through this option.

#### `INTERMEDIATE_STORAGE_CONNECTION`

Type: `text`

The name of the connection you are using to stage the data ingested from your CDC source.&#x20;

#### `INTERMEDIATE_STORAGE_LOCATION`

Type: `text`

The name of the location you are using to stage the data ingested from your CDC source.&#x20;

#### `SNAPSHOT_PARALLELISM`

Type: `int`

Default: `1`

(Optional) Configures how many snapshots are performed concurrently. The more snapshots performed concurrently, the quicker the tables are streaming. However, running more snapshots in parallel increases the load on the source database.

***

## Source options

The following source options apply to **PostgreSQL**:

```sql
[ HEARTBEAT_TABLE = '<heartbeat_name>' ]
[ PUBLICATION_NAME = ('regexFilter1', 'regexFilter2') ]
```

PostgreSQL source options:

* [`HEARTBEAT_TABLE`](#heartbeat_table)
* [`PUBLICATION_NAME`](#publication_name)

#### **`HEARTBEAT_TABLE`**

Type: `string`&#x20;

The name of the heartbeat table to use as described in [Setting up a Heartbeat Table](broken://spaces/uPkz5BRCP7SmzEQhzoOC/pages/HUfpXK1LBglvQvlKL7T7#setting-up-a-heartbeat-table).

(Optional) If it is not set, no heartbeat table is used. Using a heartbeat table is recommended to avoid the replication slot growing indefinitely when no CDC events are captured for the subscribed tables.&#x20;

#### `PUBLICATION_NAME`

Type: `text`

Adds a new publication to the current database. The publication name must be distinct from the name of any existing publication in the current database. DDL will be filtered.

***

## Replication group options

A replication group must include [`INCLUDED_TABLES_REGEX`](#included_tables_regex) (and optionally [`EXCLUDED_COLUMNS_REGEX`](#excluded_columns_regex)) or [`INCLUDED_SCHEMA_DEFINITION`](#included_schema_definition) (and optionally [`EXCLUDED_SCHEMA_DEFINITION`](#excluded_schema_definition)).

The following replication group options apply to all data sources with the exception of the differences noted in the option descriptions below.&#x20;

```sql
{    
    [ INCLUDED_TABLES_REGEX = ('<included_tables_regex>', ...) 
    EXCLUDED_COLUMNS_REGEX = ('<excluded_columns_regex>'[, ...]) ] 
        | 
    [ INCLUDED_SCHEMA_DEFINITION = (<included_table_pattern>)
    EXCLUDED_SCHEMA_DEFINITION = (<excluded_table_pattern>) ] 
}
[ COLUMN_TRANSFORMATIONS = (<column_name> = <expresion> [, ...]) ]
[ COMMIT_INTERVAL = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
[ LOGICAL_DELETE_COLUMN = <column_identifier> ]     
[ OPERATION_TYPE_COLUMN = <column_identifier> ]   
[ PRIMARY_KEY_COLUMN = <column_identifier> ]                
[ REPLICATION_TARGET = <target_connection_identifier> ]
[ TARGET_SCHEMA_NAME_EXPRESSION = '<schema_name>' ]
[ TARGET_TABLE_NAME_EXPRESSION = '<table_prefix_name>' || $table_name ]
[ UPSOLVER_EVENT_TIME_COLUMN = <column_identifier> ] 
[ WRITE_MODE = { MERGE | APPEND } ]
```

**Jump to**

* [`INCLUDED_TABLES_REGEX`](#included_tables_regex)&#x20;
* [`EXCLUDED_COLUMNS_REGEX`](#excluded_columns_regex)&#x20;
* [`INCLUDED_SCHEMA_DEFINITION`](#included_schema_definition)&#x20;
* [`EXCLUDED_SCHEMA_DEFINITION`](#excluded_schema_definition)
* [`COLUMN_TRANSFORMATIONS`](#column_transformations)
* [`COMMIT_INTERVAL`](#commit_interval)
* [`LOGICAL_DELETE_COLUMN`](#logical_delete_column)
* [`OPERATION_TYPE_COLUMN`](#operation_type_column)
* [`PRIMARY_KEY_COLUMN`](#primary_key_column)
* [`REPLICATION_TARGET`](#replication_target)
* [`TARGET_SCHEMA_NAME_EXPRESSION`](#target_schema_name_expression)
* [`TARGET_TABLE_NAME_EXPRESSION`](#target_table_name_expression)
* [`UPSOLVER_EVENT_TIME_COLUMN`](#upsolver_event_time_column)
* [`WRITE_MODE`](#write_mode)

#### `COMMIT_INTERVAL`

Type: `<integer> { MINUTE[S] | HOUR[S] | DAY[S] }`

Default: `1 MINUTE`

(Optional) Defines how often the job will load and commit data to the target in a direct ingestion job. This interval must be divisible by the number of hours in a day.&#x20;

#### `INCLUDED_TABLES_REGEX`&#x20;

Values: `('regexFilter1', 'regexFilter2'`)

One or more regular expressions that define which tables should be included in the replication.&#x20;

For example, ('**dbo.prod\_.\***') includes all tables in the **dbo** schema prefixed with **prod\_**.&#x20;

#### `EXCLUDED_COLUMNS_REGEX`

Values: `('regexFilter1', 'regexFilter2'`)

(Optional) Can only be included when [`INCLUDED_TABLES_REGEX`](#included_tables_regex) is supplied. One or more regular expressions to define which columns should be excluded from the replication.&#x20;

For example, ('**dbo.\*.address\_**') excludes all columns in all tables prefixed with **address\_**.&#x20;

#### `INCLUDED_SCHEMA_DEFINITION`&#x20;

Type: `text`

One or more expressions that define which tables should be included in the replication.

For example, **\****,* **schema.*****\****, **schema.table.\***, or **schema.table.column**.

#### `EXCLUDED_SCHEMA_DEFINITION`

Type: `text`

(Optional) Can only be included when [`INCLUDED_SCHEMA_DEFINITION`](#included_schema_definition) is supplied. One or more expressions to determine which tables should be excluded from the replication.&#x20;

For example, **schema.\***, **schema.table**.

#### `COLUMN_TRANSFORMATIONS`

Values: `( <column> = <expression>, ...)`

(Optional) If transformations must be applied prior to data landing in your target, you can use this option to perform data transformations during ingestion. When ingesting into the data lake, it is recommended that you only apply essential transformations, such as protecting PII, as it is easier to make amendments or corrections at a later date if the data remains in its raw state and instead use a transformation job to apply modifications. Therefore, as a general rule, you should only transform data that must be modified before it reaches the target. &#x20;

However, transformations provide the flexibility to shape your data before it lands in the target. You can use all the functions and operators supported by Upsolver to create calculated fields within your ingestion job. New columns can be added to your target, and existing column data can be transformed. You can perform actions such as converting data types, formatting string values, and concatenating columns to create a new column.   &#x20;

If you need to mask sensitive or personally identifiable information (PII) prior to loading into your staging tables or when performing direct ingestion into your target destination, you can use hashing functions to prevent data from being exposed downstream.&#x20;

{% hint style="info" %}
See the [Functions](/content/reference-1/functions-and-operators/functions.md) and [Operators](/content/reference-1/functions-and-operators/operators.md) references for a full list of options that you can apply.
{% endhint %}

#### `LOGICAL_DELETE_COLUMN`

Type: `text`

(Optional) By default, Upsolver permanently deletes rows from the target. To logically delete the row instead, specify the column name in which Upsolver will mark them as deleted.&#x20;

The column will be named **UPSOLVER\_IS\_DELETED** unless specified otherwise.&#x20;

This option is valid only when using [`WRITE_MODE`](#write_mode) = `MERGE`.&#x20;

#### `OPERATION_TYPE_COLUMN`

Type: `text`

(Optional) When you choose to append data in the target, Upsolver creates a column to store the operation type that occurred on the source database, e.g. insert, update, delete.&#x20;

By default, this column is named **UPSOLVER\_RECORD\_TYPE**. To rename this column, use the `OPERATION_TYPE_COLUMN` option and specify the column name.

This option is valid only when using [`WRITE_MODE`](#write_mode) = `APPEND`.&#x20;

#### `PRIMARY_KEY_COLUMN`

Type: `text`

Values: `<column_name>`

(Optional) Defines the column name to be created and used to store the original primary key value of the source tables. If not specified, the column name **UPSOLVER\_PRIMARY\_KEY** will be used.

#### `REPLICATION_TARGET`

Type: `text`

Values: `<connection_name>`

Specify the name of the target connection to write the data. The connection must be created prior to creating your job.&#x20;

#### `TARGET_SCHEMA_NAME_EXPRESSION`

Type: `expression`

Specify the name of the target schema in Snowflake that the replication group is writing to. The following inputs are supported: `$database_name`, `$schema_name`, `$full_table_name` and `$table_name`. If you are ingesting from MongoDB, you can also use the `$collection_name` input.&#x20;

As well as using the exact name of the schema, for example, `TARGET_SCHEMA_NAME_EXPRESSION = 'SALES'`, you could also write a case statement to define the target schema, for example:

```sql
TARGET_SCHEMA_NAME_EXPRESSION = 
    CASE $schema_name 
        WHEN 'sales' THEN 'SALES' 
        ELSE 'ARCHIVE' 
    END
```

&#x20;Similarly, you can use the name of the database to define the target schema as follows:&#x20;

```sql
TARGET_SCHEMA_NAME_EXPRESSION = 
    CASE $database_name 
        WHEN 'Suppliers' THEN 'SUPPLIERS' 
        ELSE 'DISTRIBUTORS' 
    END
```

#### `TARGET_TABLE_NAME_EXPRESSION`

Type: `expression`

Define how you want Upsolver to name the target tables in Snowflake. The following inputs are supported: `$database_name`, `$schema_name`, `$full_table_name` and `$table_name`. If you are ingesting from MongoDB, you can also use the `$collection_name` input.&#x20;

Optionally, you can include a prefix for the table name. Use in conjunction with `$full_table_name`, `$table_name,` or `$collection_name` to create the target table using the name of the source table or collection. For example, to create the table in Snowflake with the prefix **sales\_**, followed by the name of the source table, use the expression:&#x20;

```sql
TARGET_TABLE_NAME_EXPRESSION = 'sales_' || $table_name::STRING
```

Furthermore, you could include a `CASE` statement to determine the name of the target table based on the source database:

```sql
TARGET_TABLE_NAME_EXPRESSION = 
    CASE $database_name 
        WHEN 'Suppliers' THEN 'supp_' || $table_name::STRING 
        ELSE 'dist_' || $table_name::STRING 
    END
```

#### `UPSOLVER_EVENT_TIME_COLUMN`

Type: `text`

Values: `<column_name>`

(Optional) Upsolver uses distributed locking technology to keep events strongly ordered. The ingestion timestamp is stored in a dedicated column reflecting the order, which is `UPSOLVER_EVENT_TIME` by default unless specified here.

#### `WRITE_INTERVAL`

Type: `<integer> { MINUTE[S] | HOUR[S] | DAY[S] }`

Determines how often the job writes the CDC data to the [`REPLICATION_TARGET`](#replication_target) specified within the group. Frequent writes provide up-to-date information but may be costly, especially for Snowflake.

The writes take place over a set period of time defined by this interval and they must be divisible by the number of hours in a day.

For example, you can set `WRITE_INTERVAL` to 2 hours (the job runs 12 times per day), but trying to set `WRITE_INTERVAL` to 5 hours would fail since 24 hours is not evenly divisible by 5.

#### `WRITE_MODE`

Values: `{ MERGE | APPEND }`

Defines whether the job should merge or append the data into the target. You have three options for writing data:

<table><thead><tr><th width="182">Replication Mode</th><th width="124">Write Mode</th><th>Description</th></tr></thead><tbody><tr><td>Exact replication</td><td>MERGE</td><td><p>Performs an exact replication in the target by using hard updates and deletes, i.e. data is overwritten and deleted in the target exactly as in the source. </p><p></p><p>This option must include the <a href="#upsolver_event_time_column"><code>UPSOLVER_EVENT_TIME_COLUMN</code></a> so that Upsolver can retain the order of source events.  </p></td></tr><tr><td>Soft deletion</td><td>MERGE</td><td><p>Applies a soft delete approach by using the <a href="#logical_delete_column"><code>LOGICAL_DELETE_COLUMN</code></a> to flag when a row is deleted in the source database, without deleting it from the target. Updates are applied in the target per the source. </p><p></p><p>This option must also include the <a href="#upsolver_event_time_column"><code>UPSOLVER_EVENT_TIME_COLUMN</code></a> so that Upsolver can retain the order of source events.  </p></td></tr><tr><td>Append only</td><td>APPEND</td><td><p>Changes are appended to the target, so no updates or delete are applied. The <a href="#operation_type_column"><code>OPERATION_TYPE_COLUMN</code></a> records when the operation was an insert, update, or delete.</p><p></p><p>This option must also include the <a href="#upsolver_event_time_column"><code>UPSOLVER_EVENT_TIME_COLUMN</code></a> so that Upsolver can retain the order of source events.  </p></td></tr></tbody></table>

{% hint style="info" %}
**Note**&#x20;

At present, **Amazon Redshift** targets only support the `MERGE` write mode.
{% endhint %}

***

## Example

### Replicate CDC data&#x20;

This example applies to all source and target combinations, with the exception of the `WRITE_MODE = APPEND` replication group option, which is currently unavailable for Redshift (use `MERGE` instead):

```sql
CREATE REPLICATION JOB replicate_orders_to_target
  COMMENT = 'Replicate CDC data to multiple groups'
  COMPUTE_CLUSTER = "Default Compute (Free)"
  INTERMEDIATE_STORAGE_CONNECTION = s3_connection
  INTERMEDIATE_STORAGE_LOCATION = 's3://upsolver-integration-tests/test/' 
FROM my_database_connection 
// Add PostgreSQL source options here:
   // PUBLICATION_NAME = 'orders_publication' 
   // HEARTBEAT_TABLE = 'orders.heartbeat'
WITH REPLICATION GROUP replicate_to_prod 
  INCLUDED_TABLES_REGEX = ('orders\..*')
  EXCLUDED_COLUMNS_REGEX = ('.*\.creditcard') -- exclude creditcard columns 
  COMMIT_INTERVAL = 5 MINUTES
  REPLICATION_TARGET = my_target_connection
  TARGET_SCHEMA_NAME_EXPRESSION = 'ORDERS'
  TARGET_TABLE_NAME_EXPRESSION = $table_name
  UPSOLVER_EVENT_TIME_COLUMN = event_timestamp
  WRITE_MODE = MERGE 
    // The flag column to denote deletion in the source
    LOGICAL_DELETE_COLUMN = is_deleted
WITH REPLICATION GROUP replicate_to_dev 
  INCLUDED_TABLES_REGEX = ('orders\..*')
  COMMIT_INTERVAL = 1 HOUR
  REPLICATION_TARGET = my_target_connection
  TARGET_TABLE_NAME_EXPRESSION = 'history_' || $table_name
  TARGET_SCHEMA_NAME_EXPRESSION = 'ORDERS_DEV'
  UPSOLVER_EVENT_TIME_COLUMN = event_timestamp   
  WRITE_MODE = APPEND -- Use MERGE for Redshift targets 
    // The column that records the change type, e.g. insert, update, delete
    OPERATION_TYPE_COLUMN = operation_type;
```

This example creates a replication job named **replicate\_orders\_to\_target** that writes to two replication groups:&#x20;

* **replicate\_to\_prod** for production
* **replicate\_to\_dev** for development

To configure your database source, change the **my\_database\_connection** value to be the name of your connection. If you are using PostgreSQL, please include your additional [source options](#source-options).

To set your target, change the `REPLICATION_TARGET` value to the name of your Amazon Redshift, AWS Glue Data Catalog, Snowflake, or Polaris connection.

In our example above, each replication group has its own set of options that can be configured differently while streaming from the same data source:&#x20;

* The production replication group has a `COMMIT_INTERVAL` of **5 MINUTES** to keep the target frequently updated, whereas the development group has an interval of **1 HOUR** to update less often. Update frequency may have an impact on cloud compute costs when writing to targets such as Snowflake.&#x20;
* Furthermore, the production group will `MERGE` the data, using the column **is\_deleted** to flag if a row is deleted in the source. The development group will `APPEND` the data, using the **operation\_type** column specified in the `OERATION_TYPE_COLUMN` option to audit whether the operation in the source database was an insert, update, or delete.&#x20;
* Both groups replicate all tables in the **orders** schema, as specified in the `INCLUDED_TABLES_REGEX` option. However, all **creditcard** columns are excluded from the production target by using the `EXCLUDED_COLUMNS_REGEX` option to remove PII.
* In the development group, the `TARGET_SCHEMA_NAME_EXPRESSION` option includes the prefix value of **history\_**. This means that Upsolver will create the target table using the name of the source table and the prefix so it is clear that the target tables in this group are used for development purposes.&#x20;


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter, and the optional `goal` query parameter:

```
GET https://upsolver.gitbook.io/content/reference-1/sql-commands/jobs/create-job/replication.md?ask=<question>&goal=<endgoal>
```

`ask` is the immediate question: it should be specific, self-contained, and written in natural language.
`goal` is optional and describes the broader end goal you are ultimately trying to accomplish on behalf of the user. GitBook uses it to tailor the answer towards what is most useful for that goal.

The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
