> For the complete documentation index, see [llms.txt](https://upsolver.gitbook.io/content/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://upsolver.gitbook.io/content/reference-1/sql-commands/jobs/create-job/ingestion/apache-kafka.md).

# Apache Kafka

## Syntax

{% code overflow="wrap" %}

```sql
CREATE [SYNC] JOB <job_name>
    [{ job_options }]
    AS COPY FROM KAFKA <connection_identifier>
         [{ source_options }]
    INTO { <table_identifier> 
         | SNOWFLAKE <connection_name>.<schema_name>.<table_name> } ;
 [ WITH EXPECTATION <exp_name> EXPECT <sql_predicate> ON VIOLATION { DROP | WARN } ]; 
```

{% endcode %}

#### Jump to

* [Job options](#job-options)
* [Source options](#source-options)

## Job options

{% code overflow="wrap" %}

```sql
[ COLUMN_TRANSFORMATIONS = (<column> = <expression>, ...) ]
[ COMMENT = '<comment>' ]
[ COMMIT_INTERVAL = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
[ COMPRESSION = { AUTO 
                | GZIP 
                | SNAPPY 
                | LZO 
                | NONE 
                | SNAPPY_UNFRAMED 
                | KCL } ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]
[ CONSUMER_PROPERTIES = '<properties>' ]
[ CONTENT_TYPE = { AUTO 
                 | CSV 
                 | JSON 
                 | PARQUET 
                 | TSV 
                 | AVRO 
                 | AVRO_SCHEMA_REGISTRY 
                 | FIXED_WIDTH 
                 | REGEX 
                 | SPLIT_LINES 
                 | ORC 
                 | XML } ]
[ DEDUPLICATE_WITH = ( {COLUMNS = (<col>, ...)  | COLUMN = <col>}, WINDOW = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]    
[ END_AT = { NOW | <timestamp> } ]
[ EXCLUDE_COLUMNS = ( <col>, ...) ]
[ READER_SHARDS = <integer> ]
[ RUN_PARALLELISM = <integer> ]
[ SKIP_ALL_VALIDATIONS = { TRUE | FALSE } ]
[ SKIP_VALIDATIONS = ('MISSING_TOPIC') ]
[ START_FROM = { NOW | BEGINNING | <timestamp> } ]
[ STORE_RAW_DATA = { TRUE | FALSE } ]
```

{% endcode %}

#### Jump to

Apache Kafka job options:

* [`COMPRESSION`](#compression)
* [`COMMIT_INTERVAL`](#commit_interval)
* [`CONSUMER_PROPERTIES`](#consumer_properties-editable)
* [`CONTENT_TYPE`](#content_type-editable)
* [`DEDUPLICATE_WITH`](#deduplicate_with)
* [`READER_SHARDS`](#reader_shards-editable)
* [`RUN_PARALLELISM`](#run_parallelism-editable)
* [`START_FROM`](#start_from)
* [`STORE_RAW_DATA`](#store_raw_data)

General job options:

* [`COLUMN_TRANSFORMATIONS`](/content/reference-1/sql-commands/jobs/create-job/ingestion.md#column_transformations)&#x20;
* [`COMMENT`](/content/reference-1/sql-commands/jobs/create-job/ingestion.md#comment-editable)&#x20;
* [`COMPUTE_CLUSTER`](/content/reference-1/sql-commands/jobs/create-job/ingestion.md#compute_cluster-editable)&#x20;
* [`END_AT`](/content/reference-1/sql-commands/jobs/create-job/ingestion.md#end_at-editable)&#x20;
* [`EXCLUDE_COLUMNS`](/content/reference-1/sql-commands/jobs/create-job/ingestion.md#exclude_columns)

See also:

* [`WITH EXPECTATION`](broken://spaces/j7NDvdvbkHS8wtX8aREE/pages/Y2QSVaw0n1NMmbDpVePG)

#### `COMMIT_INTERVAL`

Type: `<integer> { MINUTE[S] | HOUR[S] | DAY[S] }`

Default: [`WRITE_INTERVAL`](#write_interval)

(Optional) Defines how often the job will commit to Snowflake in a direct ingestion job. If empty, the `WRITE_INTERVAL` value will be used. The `COMMIT_INTERVAL` value must be bigger and divisible by `WRITE_INTERVAL`.

#### `COMPRESSION`

Values: `{ AUTO | GZIP | SNAPPY | LZO | NONE | SNAPPY_UNFRAMED | KCL }`

Default: `AUTO`

(Optional) The compression format of the source.

#### `CONSUMER_PROPERTIES` — editable

Type: `text_area`

(Optional) Additional properties to use when configuring the consumer. This overrides any settings in the Apache Kafka connection.

#### `CONTENT_TYPE` — editable

Values: `{ AUTO | CSV | JSON | PARQUET | TSV | AVRO | AVRO_SCHEMA_REGISTRY | FIXED_WIDTH | REGEX | SPLIT_LINES | ORC | XML }`

Default: `AUTO`

(Optional) The file format of the content being read.

Note that `AUTO` only works when reading Avro, JSON, or Parquet, and that Upsolver does not currently support Protobuf.

To configure additional options for particular content types, see the [Content Types](/content/reference-1/sql-commands/jobs/create-job/ingestion/content-types.md) options.

#### `DEDUPLICATE_WITH`&#x20;

Values: `( {COLUMNS = (, ...) | COLUMN = }, WINDOW = { MINUTE[S] | HOUR[S] | DAY[S] } )`&#x20;

(Optional) You can use `DEDUPLICATE_WITH` to prevent duplicate rows arriving in your target. One or more columns can be supplied in the column list to act as a key so that all events within the timeframe specified in the `WINDOW` value are deduplicated.&#x20;

For example, if you have a third-party application that sends the same event multiple times a day, you can define one or more columns as the key and set the timeframe to be `1 DAY`. Upsolver will exclude all duplicate events that arrive within the day, ensuring your target only receives unique events.&#x20;

Note that if you have multiple jobs writing to a table in your lake, duplicate rows can be generated, even when you include this option.

#### `READER_SHARDS` — editable

Type: `integer`

Default: `1`

(Optional) Determines how many readers are used in parallel to read the stream.&#x20;

This number does not need to equal your number of partitions in Apache Kafka.

A recommended value would be to increase it by `1` for every 70 MB/s sent to your topic. &#x20;

#### `RUN_PARALLELISM` — editable

Type: `integer`

Default: `1`

(Optional) The number of parser jobs to run in parallel per minute.

#### `SKIP_ALL_VALIDATIONS`

Type: `Boolean`

Default: `false`

(Optional) If data is expected to arrive in the source at a later point in time, set this value to `true`.&#x20;

This option instructs Upsolver to ignore all validations to allow you to create a job that reads from a source that currently has no data.

#### `SKIP_VALIDATIONS`

Value: `MISSING_TOPIC`

(Optional) Use this option if data is expected to arrive in the source at a later point in time.&#x20;

This option tells Upsolver to ignore specific validations to allow you to create a job that reads from a source that currently has no data.

#### `START_FROM`

Values: `{ NOW | BEGINNING }`

Default: `BEGINNING`

(Optional) Configures the time from which to start ingesting data. Files before the specified time are ignored.&#x20;

#### `STORE_RAW_DATA`

Type: `Boolean`

Default: `false`

(Optional) When `true`, an additional copy of the data is stored in its original format

## Source options

```sql
TOPIC = '<topic_name>'
```

#### `TOPIC`

Type: `text`

The topic to read from.

## Examples

### Ingest data into the data lake

The following example creates a `SYNC` job that copies data from Kafka to a staging table in the data lake. The `START_FROM` option instructs the job to copy all data using `BEGINNING`, and `CONTENT_TYPE` specifies that the data is in `JSON` format.

```sql
CREATE SYNC JOB "extract data from kafka"
    COMMENT = 'Load raw orders data from Kafka topic to a staging table'
    START_FROM = BEGINNING
    CONTENT_TYPE = JSON
AS COPY FROM KAFKA upsolver_kafka_samples 
    TOPIC = 'orders' 
INTO default_glue_catalog.upsolver_samples.orders_raw_data;
```

### Ingest data directly into Snowflake

An ingestion job can load data directly into your target Snowflake database, performing transformations and ensuring data quality in the process. Direct ingestion removes the need for intermediate staging tables while the available job options provide data observability.

The example below creates a job to ingest the data stream directly from Kafka into Snowflake. The `EXCLUDE_COLUMNS` option instructs the job to ignore the **customer.firstname**, **customer.lastname**, and **customer.email** columns, and they are not ingested into Snowflake. However, using the `COLUMN_TRANSFORMATIONS` option, the **customer.email** column is masked using the [`MD5`](/content/reference-1/functions-and-operators/functions/string/md5.md) function into the new column, **hashed\_email**. This enables the target system to know that an email address exists without personally identifying the customer. &#x20;

To guarantee the uniqueness of rows in the target table, the `DEDUPLICATE_WITH` option checks for duplicate rows, based on **orderid**, that arrive within two hours of each other. Any duplicates are discarded from the stream and don't pollute the destination. Furthermore, the expectation **exp\_orderid** checks for rows that don't have an **orderid**. If an **orderid** is `NULL`, the affected row is dropped from the ingestion stream.&#x20;

The target table expects the shipping country to be **USA** for all orders. This is monitored using the **exp\_shipping** expectation, which detects and warns of any rows that have a shipping country that does not match **USA**. The violation action is set to `WARN` if this condition is violated, so rows are loaded into the target even if the warning is triggered. Dropped rows, along with rows that trigger a warning, are counted and recorded in the system tables.&#x20;

```sql
CREATE SYNC JOB ingest_kafka_to_snowflake
   COMMENT = 'Load orders into Snowflake'
   CONTENT_TYPE = JSON
   COMPUTE_CLUSTER = "Default Compute (Free)"
   EXCLUDE_COLUMNS = ('customer.firstname', 'customer.lastname', 'customer.email') 
   DEDUPLICATE_WITH = (COLUMNS = (orderid), WINDOW = 2 hours)
   COLUMN_TRANSFORMATIONS = (hashed_email = MD5(customer.email)) 
   COMMIT_INTERVAL = 5 MINUTES
AS COPY FROM KAFKA upsolver_kafka_samples 
   TOPIC = 'orders' 
INTO SNOWFLAKE my_snowflake_connection.demo.orders_transformed
  WITH EXPECTATION exp_orderid 
     EXPECT orderid IS NOT NULL 
     ON VIOLATION DROP
  WITH EXPECTATION exp_shipping 
     EXPECT customer.address.country = 'USA' 
     ON VIOLATION WARN;
```


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter, and the optional `goal` query parameter:

```
GET https://upsolver.gitbook.io/content/reference-1/sql-commands/jobs/create-job/ingestion/apache-kafka.md?ask=<question>&goal=<endgoal>
```

`ask` is the immediate question: it should be specific, self-contained, and written in natural language.
`goal` is optional and describes the broader end goal you are ultimately trying to accomplish on behalf of the user. GitBook uses it to tailor the answer towards what is most useful for that goal.

The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
