> For the complete documentation index, see [llms.txt](https://upsolver.gitbook.io/content/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://upsolver.gitbook.io/content/how-to-guides-1/jobs/basics/real-time-data-ingestion-amazon-kinesis-to-clickhouse.md).

# Real-time Data Ingestion — Amazon Kinesis to ClickHouse

Upsolver's integration with ClickHouse enables customers to reliably ingest large volumes of data in near real-time. With support for the main industry data platforms, Upsolver can ingest from databases, streaming, and file sources into ClickHouse Cloud 50-100X faster than other solutions.

Purpose-built for big data, Upsolver easily moves high-volume, streaming data from your operational systems into your analytics target. Whether your application source is generating events at a rate of thousands per second, or your gaming or ad-tech company creates millions of events per second, Upsolver can ingest your stream into your data lakehouse and target analytics systems with ease.&#x20;

Furthermore, Upsolver automates schema evolution, maintains performance, and ensures the strongly-ordered, exactly once-delivery of your data. &#x20;

<figure><img src="https://lh7-us.googleusercontent.com/i__qP7Z-rPPoLjRedXtcYBTiYauVq1-drz6WUZTLgpoJQBDNCz-S8JMAaVxZJL-HlBQlaxCv_82GskNzjBGos1oWt4mH9s3aHWdW5LGELjEvqLKqpQY_ISTqRyU_tLdPGm0qhwIW8Lf0mX5_xdEWgzY" alt=""><figcaption><p>Ingest high scale, streaming events into ClickHouse with Upsolver. </p></figcaption></figure>

Upsolver automatically detects changes in the source and streams only changed rows to ClickHouse. Orchestration and task scheduling is handled internally, eliminating the need for external tools like Airflow or AWS Lambda.

## What you will learn

Upsolver enables you to deliver high-volume data to ClickHouse in a few simple steps. In this guide, you will learn how to connect to your source and target data platforms, create a simple but powerful job to ingest data from Amazon Kinesis to ClickHouse, and then observe your data to ensure data is delivered as expected.

***

## Prerequisites

Before you begin, please read the guide on [How To Configure Access to Amazon Kinesis](/content/how-to-guides-1/connectors/configure-access/amazon-kinesis.md) to enable permissions for Upsolver to read the data from your Amazon Kinesis source. &#x20;

The steps for ingesting your data from Amazon Kinesis to ClickHouse are as follows:

1. Connect to Amazon Kinesis
2. Connect to ClickHouse
3. Create an ingestion job
4. Observe your data

***

## Step 1

### Connect to Amazon Kinesis

The first step is to connect to your Amazon Kinesis source. When you create a connection in Upsolver, other users in your organization will be able to use this connection, and it is always on, meaning you don't need to reconnect if you want to create another job, for example.

**Here's the code:**

```sql
CREATE KINESIS CONNECTION my_kinesis_connection
    AWS_ROLE = 'arn:aws:iam::123456789012:role/upsolver-sqlake-role'
    REGION = 'us-east-1'
    COMMENT = 'Connection to all Kinesis streams';
```

Here, a connection named **my\_kinesis\_connection** is created with access to all streams. An optional comment was added to inform other users what this connection does and this is visible in the [connections system table](/content/reference-1/monitoring/system-catalog/information-schema/connections.md). &#x20;

## Step 2

### Create a connection to ClickHouse

After you have successfully connected to your Amazon Kinesis stream, the next step is to create a connection to your target ClickHouse database. Again, this connection is visible and available to all users within your organization, and always on.

**Here's the code:**

```sql
CREATE CLICKHOUSE CONNECTION my_clickhouse_connection
    CONNECTION_STRING = 'http://x.us-east-1.aws.clickhouse.cloud:8123/sales_db'
    USER_NAME = 'my_username'
    PASSWORD = 'my_password'
    COMMENT = 'Connection to Sales database';
```

In the above example, a connection named **my\_clickhouse\_connection** is created to access the **sales\_db** database. Again, an optional comment was added to inform other users as to what this connection does and this is visible in the [connections system table](/content/reference-1/monitoring/system-catalog/information-schema/connections.md).

## Step 3

### Create an ingestion job

Now that you have a connection to your source and target, you can create a job to ingest the data. As with the connections you created, the job will be visible to other users in your organization.

**Here's the code to create the ingestion job:**

```sql
CREATE SYNC JOB load_kinesis_to_clickhouse
    COMMENT = 'Ingest sales orders from Kinesis to Clickhouse
    START_FROM = BEGINNING
    CONTENT_TYPE = AUTO
    EXCLUDE_COLUMNS = ('customer.password') 
    COLUMN_TRANSFORMATIONS = (hashed_email = MD5(customer.email))       
    DEDUPLICATE_WITH = (COLUMNS = (orderid), WINDOW = 4 HOURS)
    COMMIT_INTERVAL = 5 MINUTES
AS COPY FROM KINESIS my_kinesis_connection
    STREAM = 'orders'  
INTO CLICKHOUSE my_clickhouse_connection.sales_db.orders_tbl
    WITH EXPECTATION exp_custid_not_null 
      EXPECT customer.customer_id IS NOT NULL ON VIOLATION DROP
    WITH EXPECTATION exp_taxrate 
      EXPECT taxrate = 0.12 ON VIOLATION WARN;
```

Let's take a look at what this code does. A job named **load\_kinesis\_to\_clickhouse** is created with an optional `COMMENT` to describe its purpose. This is useful for future reference if you or other users in your organization query the [jobs system table](/content/reference-1/monitoring/system-catalog/monitoring/jobs.md) to lookup metadata for your jobs.

The job includes options that we can customize for our use case. Using `START_FROM`, we instruct Upsolver to stream events from a point in time. We could specify a date here, but instead are using **BEGINNING** to ingest all events discovered in the source.&#x20;

The `CONTENT_TYPE` option enables us to determine which file types should be ingested, though we have specified **AUTO** so that Upsolver automatically detects the type for us.

In this example, we want to perform some data protection operations to ensure PII data is not visible in the target. Using the `EXCLUDE_COLUMNS` option, we can specify a list of columns that we don't want Upsolver to write to the target. In this case, the **customer.password** column is excluded from being written to our ClickHouse target.&#x20;

Furthermore, we want to ingest the **customer.email** column but mask the contents. The `COLUMN_TRANSFORMATIONS` option enables us to create a new column named **hashed\_email** in the target, mask the **customer.email** value using the `MD5()` function, and write the hashed value into our new column.

We can also prevent duplicates from reaching our ClickHouse target using `DEDUPLICATE_WITH`. This option accepts one or more columns and a time window over which any duplicate rows will be identified and removed. In our job, we instruct Upsolver to ignore any duplicate **orderid** values within a time window of **4 HOURS**, so they don't get written to our target database.&#x20;

The last job option specified is `COMMIT_INTERVAL`. This tells Upsolver how frequently the data should be written to the target. It can be more cost efficient to set a longer write time, depending on the target system. In our case, writing the data every **5 MINUTES** is often enough to fulfill downstream business objectives, but you can adjust this to suit requirements for how fresh the data should be be, weighed against vendor costs for writing data.&#x20;

Using the `COPY FROM` command, the job copies the data from the source, in this case the **orders** `STREAM` in the Kinesis source specified in **the my\_kinesis\_connection** connection we created above. The data is loaded into the **orders\_tbl** table in the **sales\_db** database in our **my\_clickhouse\_connection** connection.

A really useful way to prevent bad data reaching our downstream targets is to include data quality expectations in our job, to ensure the data reaching our target is of the quality that end-users anticipate. Using an `EXPECTATION`, we can check each row based on a predicate, and either create a warning for the row, or delete it. You can use any of the supported functions and operators in Upsolver's [extensive library](/content/reference-1/functions-and-operators/data-types.md) to create expectations.&#x20;

In this example, the first expectation, **exp\_custid\_not\_null**, checks that the **customer.customer\_id** column is not **NULL**. If the value is **NULL**, the row is dropped and won't be loaded into the target. The second expectation, **exp\_taxrate**, examines the value of the **taxrate** column. If the value is not equal to **0.12**, a warning is raised, but the row is not dropped. In the next step, you will learn how to view the results of rows that don't match expectations.

## Step 4

### Observe your data

After the job has been executed and is loading data into ClickHouse, you can observe the stream using Datasets, which provide a wealth of information on your pipelines and data health. From the main menu, click **Datasets**. In the **Connections** tree, expand the nodes to display the ClickHouse connection. &#x20;

In the **Schema** tab, notice the **Overview** and **Written Rows Over Time** cards that give instant insight into your pipeline. Scroll through the list of columns and rows in the **Written Data Statistics** table, which displays the ingested columns, the **Type** and **Density**, along with other useful metadata. Click on a **column name** to view the individual column statistics. Notice the **Values by Frequency** card, which displays a list of unique values within the column, and the percentage showing how often this value is seen across the dataset.  &#x20;

Click on the **Data Violations** tab. Here you will find the expectations created in [Step 3](#step-3). In the **Expectations** table, notice the two expectations for the **load\_kinesis\_to\_clickhouse** job. If you have existing expectations, you can use the **Search expectations** box to filter on the job name to make them visible.

The **Action** column displays **warn** or **drop**, to indicate how Upsolver handles rows that do not meet the expectation. The **Violations Today** column displays a count of the number of times the expectation was violated since midnight (UTC time) today, alongside **Total Violations**, which is a count of all violations since the expectation was created.&#x20;

{% hint style="success" %}
**Learn More**&#x20;

Learn about [Expectations](/content/articles-1/data/expectations.md) and then follow the how-to guide on [Managing Data Quality - Ingesting Data with Expectations](/content/how-to-guides-1/jobs/advanced-use-cases/managing-data-quality-ingesting-data-with-expectations.md) to discover how to create a report to monitor your expectations.
{% endhint %}

***

## Conclusion

In this guide, you learned how to create connections to your source and target data platforms. Then you saw how easy it is to use straightforward SQL code to create an ingestion job to copy your streaming data into a ClickHouse database. You learned how to include job options to customize the job, protect PII, and ensure sensitive data was either excluded or masked before it landed in the target.&#x20;

Finally, you explored how expectations can perform in-flight data quality checks, either by dropping disqualifying rows, or logged a warning in the system table. Finally you learned how to use Datasets to observe your data and uncover spikes or drops in volume, and check for rows not meeting your data expectation requirements.

***

## Try it yourself

To ingest your streaming data from Amazon Kinesis to ClickHouse:

1. Create a connection to your [Amazon Kinesis](/content/reference-1/sql-commands/connections/create-connection/amazon-kinesis.md) source
2. Create a connection to your [ClickHouse](/content/reference-1/sql-commands/connections/create-connection/clickhouse.md) database
3. Write a job to [ingest data from Kinesis](/content/reference-1/sql-commands/jobs/create-job/ingestion/amazon-kinesis.md) to ClickHouse
   1. Create [expectations](/content/articles-1/data/expectations.md) to maintain data quality standards
4. Observe your data with [Datasets](/content/reference-1/monitoring/datasets.md)
