> For the complete documentation index, see [llms.txt](https://upsolver.gitbook.io/content/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://upsolver.gitbook.io/content/reference-1/sql-commands/jobs/create-job/ingestion/amazon-kinesis.md).

# Amazon Kinesis

## Syntax

{% code overflow="wrap" %}

```sql
CREATE [SYNC] JOB <job_name>
    [{ job_options }]
AS COPY FROM KINESIS <connection_identifier>
    [{ source_options }]
INTO { <table_identifier> 
         | SNOWFLAKE <connection_name>.<schema_name>.<table_name> }
 [ WITH EXPECTATION <exp_name> EXPECT <sql_predicate> ON VIOLATION { DROP | WARN } ];
```

{% endcode %}

#### Jump to

* [Job options](#job-options)
* [Source options](#source-options)

## Job options

{% code overflow="wrap" %}

```sql
[ COLUMN_TRANSFORMATIONS = (<column> = <expression>, ...) ]
[ COMMENT = '<comment>' ]
[ COMMIT_INTERVAL = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]
[ COMPRESSION = { AUTO 
                | GZIP 
                | SNAPPY 
                | LZO 
                | NONE 
                | SNAPPY_UNFRAMED 
                | KCL } ]
[ CONTENT_TYPE = { AUTO 
                 | CSV 
                 | JSON 
                 | PARQUET 
                 | TSV 
                 | AVRO 
                 | AVRO_SCHEMA_REGISTRY 
                 | FIXED_WIDTH 
                 | REGEX 
                 | SPLIT_LINES 
                 | ORC 
                 | XML } ]
[ DEDUPLICATE_WITH = ( {COLUMNS = (<col>, ...)  | COLUMN = <col>}, WINDOW = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]                
[ END_AT = { NOW | <timestamp> } ]  
[ EXCLUDE_COLUMNS = ( <col>, ...) ]
[ READER_SHARDS = <integer> ]
[ RUN_PARALLELISM = <integer> ]
[ SKIP_ALL_VALIDATIONS = { TRUE | FALSE } ]  
[ SKIP_VALIDATIONS = ('MISSING_STREAM') ]
[ START_FROM = { NOW | BEGINNING | <timestamp> } ]
[ STORE_RAW_DATA = { TRUE | FALSE } ]
```

{% endcode %}

#### Jump to

Amazon Kinesis job options:

* [`COMMIT_INTERVAL`](#commit_interval)
* [`COMPRESSION`](#compression)
* [`CONTENT_TYPE`](#content_type-editable)
* [`DEDUPLICATE_WITH`](#deduplicate_with)
* [`READER_SHARDS`](#reader_shards-editable)
* [`RUN_PARALLELISM`](#run_parallelism-editable)
* [`SKIP_ALL_VALIDATIONS`](#skip_all_validations)
* [`SKIP_VALIDATIONS`](#skip_validations)
* [`START_FROM`](#start_from)
* [`STORE_RAW_DATA`](#store_raw_data)

General job options:

* [`COLUMN_TRANSFORMATIONS`](/content/reference-1/sql-commands/jobs/create-job/ingestion.md#column_transformations)&#x20;
* [`COMMENT`](/content/reference-1/sql-commands/jobs/create-job/ingestion.md#comment-editable)&#x20;
* [`COMPUTE_CLUSTER`](/content/reference-1/sql-commands/jobs/create-job/ingestion.md#compute_cluster-editable)&#x20;
* [`END_AT`](/content/reference-1/sql-commands/jobs/create-job/ingestion.md#end_at-editable)&#x20;
* [`EXCLUDE_COLUMNS`](/content/reference-1/sql-commands/jobs/create-job/ingestion.md#exclude_columns)

See also:

* [`WITH EXPECTATION`](broken://spaces/j7NDvdvbkHS8wtX8aREE/pages/Y2QSVaw0n1NMmbDpVePG)

#### `COMMIT_INTERVAL`

Type: `<integer> { MINUTE[S] | HOUR[S] | DAY[S] }`

Default: [`WRITE_INTERVAL`](#write_interval)

(Optional) Defines how often the job will commit to Snowflake in a direct ingestion job. If empty, the `WRITE_INTERVAL` value will be used. The `COMMIT_INTERVAL` value must be bigger and divisible by `WRITE_INTERVAL`.

#### `COMPRESSION`

Values: `{ AUTO | GZIP | SNAPPY | LZO | NONE | SNAPPY_UNFRAMED | KCL }`

Default: `AUTO`

(Optional) The compression of the source.

#### `CONTENT_TYPE` — editable

Values: `{ AUTO | CSV | JSON | PARQUET | TSV | AVRO | AVRO_SCHEMA_REGISTRY | FIXED_WIDTH | REGEX | SPLIT_LINES | ORC | XML }`

Default: `AUTO`

(Optional) The file format of the content being read.

Note that `AUTO` only works when reading Avro, JSON, or Parquet.

To configure additional options for certain content types, see the [Content Types](/content/reference-1/sql-commands/jobs/create-job/ingestion/content-types.md) options.

#### `DEDUPLICATE_WITH`&#x20;

Values: `( {COLUMNS = (, ...) | COLUMN = }, WINDOW = { MINUTE[S] | HOUR[S] | DAY[S] } )`&#x20;

(Optional) You can use `DEDUPLICATE_WITH` to prevent duplicate rows arriving in your target. One or more columns can be supplied in the column list to act as a key so that all events within the timeframe specified in the `WINDOW` value are deduplicated.&#x20;

For example, if you have a third-party application that sends the same event multiple times a day, you can define one or more columns as the key and set the timeframe to be `1 DAY`. Upsolver will exclude all duplicate events that arrive within the day, ensuring your target only receives unique events.&#x20;

Note that if you have multiple jobs writing to a table in your lake, duplicate rows can be generated, even when you include this option.

#### `READER_SHARDS` — editable

Type: `integer`

Default: `1`

(Optional) Determines how many readers are used in parallel to read the stream.&#x20;

This number does not need to equal your number of shards in Kinesis.

A recommended value would be to increase it by 1 for every 70 MB/s sent to your stream. &#x20;

#### `RUN_PARALLELISM` — editable

Type: `integer`

Default: `1`

(Optional) The number of parser jobs to run in parallel per minute.

#### `SKIP_ALL_VALIDATIONS`

Type: `Boolean`

Default: `false`

(Optional) If data is expected to arrive in the source at a later point in time, set this value to `true`.&#x20;

This option instructs Upsolver to ignore all validations to allow you to create a job that reads from a source that currently has no data.

#### `SKIP_VALIDATIONS`

Value: `MISSING_STREAM`

(Optional) Use this option if data is expected to arrive in the source at a later point in time.&#x20;

This option tells Upsolver to ignore specific validations to allow you to create a job that reads from a source that currently has no data.

#### `START_FROM`

Values: `{ NOW | BEGINNING | <timestamp> }`

Default: `BEGINNING`

(Optional) Configures the time to start ingesting data. Files dated before the specified time are ignored.

#### `STORE_RAW_DATA`

Type: `Boolean`

Default: `false`

(Optional) When `true`, an additional copy of the data is stored in its original format.

## Source options

```sql
STREAM = '<stream_name>'
```

#### `STREAM`

Type: `text`

The stream to read from.

## Examples

### Ingest data into the data lake

In the following example, a new sync job is created to ingest streaming data from Kinesis into a table in the data lake. The `START_FROM` option defines that all data will be ingested, and the `CONTENT_TYPE` option informs the job that the data is stored in `JSON` format.

After ingesting data into the data lake, you can create a [transformation](/content/reference-1/sql-commands/jobs/create-job/transformation.md) job to load the data into your target.

```sql
CREATE SYNC JOB stage_kinesis_data
    START_FROM = BEGINNING
    CONTENT_TYPE = JSON
AS COPY FROM KINESIS your_kinesis_connection
    STREAM = 'sample-stream'
INTO default_glue_catalog.upsolver_samples.kinesis_data_staging; 
```

### Ingest data directly into Snowflake

An ingestion job can load data directly from Kinesis into Snowflake, bypassing the need to write additional transformation jobs to load the data into the target table.

The example below creates a job to ingest data directly into Snowflake and uses job options and expectations to perform transformations and apply data quality conditions.

The `EXCLUDE_COLUMNS` option tells the job to ignore the **customer.email** column, as the email value has been hashed into a new column named **hashed\_email** using the `COLUMN_TRANSFORMATIONS` option. To protect personal information, **customer.email** is masked before it reaches the target table in Snowflake, and therefore the raw column data should be excluded.&#x20;

Furthermore, duplicate events are managed using the `DEDUPLICATE_WITH` option. Any duplicate events that arrive within a four-hour interval, are deduplicated on **orderid**. The target table is expecting all orders to arrive with a value for **customer.address.address1**, so a check is added to flag how many rows are loaded without this value. A further warning has been added to count the number of rows that do not have a **taxrate** matching the expected value of **0.12**.&#x20;

```sql
CREATE SYNC JOB ingest_kinesis_to_snowflake
    COMMENT = 'Ingest orders to Snowflake'
    START_FROM = BEGINNING
    CONTENT_TYPE = JSON
    EXCLUDE_COLUMNS = ('customer.email') 
    COLUMN_TRANSFORMATIONS = (hashed_email = MD5(customer.email))       
    DEDUPLICATE_WITH = (COLUMNS = (orderid), WINDOW = 4 hours)
    COMMIT_INTERVAL = 5 MINUTES
AS COPY FROM KINESIS my_kinesis_connection 
    STREAM = 'orders'  
INTO SNOWFLAKE my_snowflake_connection.demo.orders_transformed
    WITH EXPECTATION exp_add1_not_null 
        EXPECT customer.address.address1 IS NOT NULL 
        ON VIOLATION WARN
    WITH EXPECTATION exp_taxrate 
        EXPECT taxrate = 0.12 
        ON VIOLATION WARN;
```


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter, and the optional `goal` query parameter:

```
GET https://upsolver.gitbook.io/content/reference-1/sql-commands/jobs/create-job/ingestion/amazon-kinesis.md?ask=<question>&goal=<endgoal>
```

`ask` is the immediate question: it should be specific, self-contained, and written in natural language.
`goal` is optional and describes the broader end goal you are ultimately trying to accomplish on behalf of the user. GitBook uses it to tailor the answer towards what is most useful for that goal.

The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
