> For the complete documentation index, see [llms.txt](https://upsolver.gitbook.io/content/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://upsolver.gitbook.io/content/how-to-guides-1/jobs/database-replication/ingest-your-microsoft-sql-server-cdc-data-to-snowflake.md).

# Ingest Your Microsoft SQL Server CDC Data to Snowflake

Learn how to ingest data from your Microsoft SQL Server database into a staging table in the data lake, prior to transforming and loading it into Snowflake.&#x20;

## Prerequisites

Before you ingest data into Upsolver, you must enable change data capture on your SQL Server database. Please [follow this guide](/content/how-to-guides-1/connectors/enable-cdc/microsoft-sql-server.md) if you have not already enabled CDC.

The steps for ingesting your CDC data are as follows:

1. Connect to SQL Server
2. Create a staging table to store the CDC data
3. Create an ingestion job
4. View the job status to check the snapshotting process
5. View the CDC data in the staging table
6. Connect to Snowflake
7. Create a transformation job

## Step 1&#x20;

### Connect to SQL Server

The first step is to connect to the database from which you want to ingest your CDC data. You will need the connection string to your SQL Server database, and the username and password. Ensure your login has appropriate credentials for reading from the change data capture tables.

**Here's the code:**

```sql
CREATE MSSQL CONNECTION my_mssql_connection
  CONNECTION_STRING = 
    'jdbc:sqlserver://ms-sqlserver-1.myendpoint.us-east-1.rds.amazonaws.com:1433;
    DatabaseName=mydb'
  USER_NAME = '<user_name>'
  PASSWORD = '<password>';
```

## Step 2

### Create a staging table to store the CDC data

After connecting to your SQL Server source database, the next step is to create a table in the data lake for staging the CDC data.&#x20;

**Here's the code to create the staging table:**

```sql
CREATE TABLE default_glue_catalog.upsolver_samples.orders_raw_data()
    PARTITIONED BY $event_date;
```

Let's understand what this code does. Firstly, a table named **orders\_raw\_data** is created in the **upsolver\_samples** database. Notice that no columns have been defined for the table. The open brackets instruct Upsolver to infer the columns and types during data ingestion. This is helpful if you are unsure of the data in the source and want Upsolver to manage type changes and schema updates. &#x20;

Upsolver recommends partitioning by the system column `$event_date` or another date column in order to optimize your query performance. The `$event_date` column is added by default as a system column, along with `$event_time`, which will be used later when you create your transformation job. You can view all the system columns that Upsolver adds to the tables in your default glue catalog, by expanding the table name in the **Entities** tree in Upsolver, and then expanding **SYSTEM COLUMNS**.&#x20;

{% hint style="success" %}
**Learn More**

To learn more about table options and data retention, please see [CREATE TABLE](/content/reference-1/sql-commands/tables/create-table.md). &#x20;
{% endhint %}

## Step 3

### Create an ingestion job

Next, you can create an ingestion job to copy the CDC data into your staging table.

**Here's the code to create the ingestion job:**

```sql
CREATE SYNC JOB load_raw_data_from_mssql
    COMMENT = 'Ingest CDC data from SQL Server'
AS COPY FROM MSSQL my_mssql_connection
    TABLE_INCLUDE_LIST = ('dbo.orders', 'dbo.products', 'dbo.customers')
    COLUMN_EXCLUDE_LIST = ('dbo.orders.credit_card', 'dbo.customers.address_*')
INTO default_glue_catalog.upsolver_samples.orders_raw_data; 
```

Let's take a look at what this code does. A job named **load\_raw\_data\_from\_mssql** is created with an optional comment that you can use to describe the purpose of your job. Other users in your organization can see comments.

An ingestion job uses the `COPY FROM` command to copy source data to the target, in this case, the **orders\_raw\_data** table in the AWS Glue Data Catalog, using the **my\_mssql\_connection** connection.

In this example, the `TABLE_INCLUDE_LIST` source option instructs the job to ingest from the **orders**, **products**, and **customers** tables, and ignore any other discovered tables. In this instance, we want to exclude some PII data and use the `COLUMN_EXCLUDE_LIST` source option to tell Upsolver to ignore the **credit\_card** column in the **orders** table, and all columns in the **customers** column that are prefixed with **address\_**.&#x20;

## Step 4

### View the job status

When the job is created, Upsolver takes a snapshot of each of the included tables prior to the streaming process. You can check the status of the snapshotting process by clicking on **Jobs** from the main menu on the left-hand side of the Upsolver UI. Then, click on the job you created, e.g. **load\_raw\_data\_from\_mssql**, and the job page displays each table and status, e.g. **Snapshotting**, or **Streaming**. After the snapshot process has been completed and all tables are streaming, you can continue to use this page to monitor and troubleshoot your job.&#x20;

{% hint style="success" %}
**Learn More**

Discover how to monitor the performance of your jobs and troubleshoot problems using the [Job Status](/content/reference-1/monitoring/job-status.md) reference.
{% endhint %}

## Step 5

### View the CDC data in the staging table&#x20;

During the snapshotting process, Upsolver reads the column names and types from the CDC tables in the source database and creates a corresponding column in the staging table. Appended to your CDC columns are system information columns, including the source database, schema, and table names. Additional columns include the log sequence number (LSN) when the change was committed on the source database and an **$is\_delete** column.&#x20;

Prior to creating a transformation job to load data into the target, you can check the data in the staging table.

**Here's the code:**

```sql
SELECT * 
FROM default_glue_catalog.upsolver_samples.orders_raw_data
LIMIT 50; -- return 50 rows only to view a small sample of the ingested data 
```

Confirm your data is as expected, before moving on to the next steps of creating a transformation job to load the data into your target.

{% hint style="success" %}
This example uses a Snowflake database as the target, however, the process for writing to other destinations is similar.&#x20;

Please see the [Transformation](/content/reference-1/sql-commands/jobs/create-job/transformation.md) jobs reference for the relevant syntax and options for your chosen target.
{% endhint %}

## Step 5

### Connect to Snowflake

The next step is to connect to your target database, in this case, Snowflake. You can create a persistent connection that is shared with other users in your organization.

**Here's the code:**

```sql
CREATE SNOWFLAKE CONNECTION my_snowflake_connection
    CONNECTION_STRING = 
        'jdbc:snowflake://snowflakedemo.us-east-1.snowflakecomputing.com?
        db=DEMO_DB&warehouse=DEMO_WH&role=ADMIN'
    USER_NAME = '<username>'
    PASSWORD = '<password>';
```

## Step 6

### Create a transformation job

Now that you have a connection to Snowflake, you can load your data using a transformation job. If you haven't already done so, create the target table in Snowflake.

**Here's the code to create a table in Snowflake:**

```sql
CREATE OR REPLACE TABLE DEMO_DB.SALES.ORDERS_TRANSFORMED (
	ORDER_ID VARCHAR(16777216),
	CUSTOMER_ID VARCHAR(16777216),
	CUSTOMER_NAME VARCHAR(16777216),
	NET_TOTAL FLOAT,
	ORDER_DATE DATE NOT NULL
);
```

Next, create a transformation job to replicate your CDC data to the target table.

**Here's the code:**

```sql
CREATE SYNC JOB transform_and_insert_into_snowflake
    START_FROM = BEGINNING
    RUN_INTERVAL = 1 MINUTE
AS INSERT INTO SNOWFLAKE my_snowflake_connection.SALES.ORDERS_TRANSFORMED 
   MAP_COLUMNS_BY_NAME
    SELECT 
        custid AS CUSTOMER_ID,
        orderid AS ORDER_ID,  
        first_name || ' ' || last_name AS CUSTOMER_NAME,
        nettotal AS NET_TOTAL,
        order_date AS ORDER_DATE           
    FROM default_glue_catalog.upsolver_samples.orders_raw_data 
    WHERE $event_time BETWEEN run_start_time() AND run_end_time();
```

Let's understand what this job does.&#x20;

This code creates a job named **transform\_and\_insert\_into\_snowflake** and includes a couple of job options: `START_FROM` instructs the job to replicate all historical data by specifying the **BEGINNING** parameter, while `RUN_INTERVAL` tells Upsolver that this job should execute every **1 MINUTE**.

The job inserts the data into the **ORDERS\_TRANSFORMED** table in the **SALES** schema. We don't need to specify the database (**DEMO\_DB**) here because this is included in the connection string.&#x20;

The `MAP_COLUMNS_BY_NAME` option maps each column in the `SELECT` statement to the column with the same name in the target table. This is helpful as the job, therefore, does not map the columns based on ordinality: if you compare the order of the columns in the script that creates the table with the order of the columns in the `SELECT` statement of the job, you'll notice that **CUSTOMER\_ID** and **ORDER\_ID** are in different positions.

The `SELECT` statement specifies which columns will be loaded into the target, and the alias names enable column mapping by name. A string function has been used to concatenate the customer's first and last names into the **CUSTOMER\_NAME** column.

In the `WHERE` clause, all rows that have an `$event_time` that is between the start and end time interval of the job will be included in the load. The `$event_time` system column is populated with a timestamp when the data lands in the staging table.

{% hint style="success" %}
Upsolver includes a wide range of [Functions](/content/reference-1/functions-and-operators/functions.md) and [Operators](/content/reference-1/functions-and-operators/operators.md) that you can use to create advanced use cases with your jobs and customize your data to suit requirements.&#x20;
{% endhint %}

***

## Conclusion

In this guide you learned how to connect to SQL Server and ingest your change data capture tables into a staging table in the data lake. You learned how to check the status of your tables during the snapshotting process, and how to view the ingested data. Then you discovered how to write a transformation job to replicate the data from your staging table to your target.&#x20;

## Try it yourself

To ingest your CDC data from SQL Server:

1. Create a connection to your CDC-enabled SQL Server database
2. Create a staging table and ingest your change data capture to the data lake
3. Connect to your target data lake or warehouse destination
4. Write a transformation job to replicate the data to your target
5. Monitor your jobs using the job status metrics

***

{% hint style="success" %}
**Learn More**

Please see the SQL command reference for [Microsoft SQL Server](/content/reference-1/sql-commands/jobs/create-job/ingestion/microsoft-sql-server.md) for the full list of job options, and examples.
{% endhint %}


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter, and the optional `goal` query parameter:

```
GET https://upsolver.gitbook.io/content/how-to-guides-1/jobs/database-replication/ingest-your-microsoft-sql-server-cdc-data-to-snowflake.md?ask=<question>&goal=<endgoal>
```

`ask` is the immediate question: it should be specific, self-contained, and written in natural language.
`goal` is optional and describes the broader end goal you are ultimately trying to accomplish on behalf of the user. GitBook uses it to tailor the answer towards what is most useful for that goal.

The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
