> For the complete documentation index, see [llms.txt](https://upsolver.gitbook.io/content/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://upsolver.gitbook.io/content/quickstarts-1/jobs/ingestion/stream-and-file-sources/amazon-kinesis.md).

# Amazon Kinesis

{% hint style="success" %}
**Prerequisites**

Ensure that you have an [Amazon Kinesis](/content/reference-1/sql-commands/connections/create-connection/amazon-kinesis.md) connection with the correct permissions to read from your stream.

Additionally, if you are ingesting to the data lake, you need a metastore connection that can be used to create a staging table as well as a corresponding storage connection that can be used to store your table's underlying files.
{% endhint %}

## Create a job that reads from Kinesis

You can create a job to ingest your data from Kinesis into a staging table in the data lake or ingest directly into your target.&#x20;

**Jump to**

* [Ingest to the data lake](#ingest-to-the-data-lake)
* [Ingest directly to the target](#ingest-directly-to-the-target)
* [Job options](#job-options)

### Ingest to the data lake

After completing the prerequisites, you can create your staging tables. The example below creates a table without defining columns or data types, as these will be inferred automatically by Upsolver, though you can define columns if required:

```sql
CREATE TABLE default_glue_catalog.upsolver_samples.orders_raw_data()
    PARTITIONED BY $event_date;
```

Upsolver recommends partitioning by the system column `$event_date` or another date column within the data in order to optimize your query performance.

Next, create an ingestion job as follows:

```sql
CREATE SYNC JOB stage_kinesis_data
    START_FROM = BEGINNING
    CONTENT_TYPE = JSON
AS COPY FROM KINESIS your_kinesis_connection 
    STREAM = 'sample-stream'
    INTO default_glue_catalog.kinesis.kinesis_data_stg; 
```

{% hint style="warning" %}
Note that multiple ingestion jobs can write to the same table, resulting in a final table that contains a `UNION ALL` of all data copied into that table. This means that any duplicate rows that are written are not removed and the columns list may expand if new columns are detected.

This may not be your intended behavior, so ensure you are writing to the correct table before running your job.
{% endhint %}

The example above only uses a small subset of all job options available when reading from Kinesis. Depending on your use case, there may be other options you want to configure. For instance, you may want to specify the compression of your source data rather than have it auto-detected.

### Ingest directly to the target

Directly ingesting your data enables you to copy your data straight into the target system, bypassing the need for a staging table. The syntax and job options are identical to ingesting into a staging table, however, the target connection differs:

```sql
CREATE SYNC JOB ingest_kinesis_to_snowflake
    COMMENT = 'Ingest orders directly to Snowflake'
    CONTENT_TYPE = JSON
    START_FROM = BEGINNING
AS COPY FROM KINESIS your_kinesis_connection STREAM = 'sample-stream'
    INTO SNOWFLAKE my_snowflake_connection.demo.orders_transformed;
```

### Job options

Transformations can be applied to your ingestion job to correct issues, exclude columns, or mask data before it lands in the target. Furthermore, you can use expectations to define data quality rules on your data stream and take appropriate action.&#x20;

## Alter a job that reads from Kinesis

Some job options are considered mutable, enabling you to run a SQL command to alter an existing ingestion job rather than create a new job. The job options apply equally to jobs that ingest into the data lake or directly to the target, and the syntax to alter a job is identical.

For example, take the job we created earlier:

```sql
CREATE SYNC JOB stage_kinesis_data
    START_FROM = BEGINNING
    CONTENT_TYPE = JSON
AS COPY FROM KINESIS your_kinesis_connection
    STREAM = 'sample-stream'
    INTO default_glue_catalog.kinesis.kinesis_data_stg; 
```

If you want to keep the job as is, but only change the cluster that is running the job, execute the following command:

```sql
ALTER JOB my_kinesis_ingestion_job 
    SET COMPUTE_CLUSTER = my_new_cluster;
```

Note that some options such as `COMPRESSION` cannot be altered once the connection has been created.

## Drop a job that reads from Kinesis

If you no longer need a job, you can easily drop it using the following SQL command. This applies to jobs that ingest into the data lake and directly into the target:

```sql
DROP JOB my_kinesis_ingestion_job;
```

***

{% hint style="success" %}
**Learn More**

To learn about the available job options, see the [Ingestion](/content/reference-1/sql-commands/jobs/create-job/ingestion.md) jobs page, which describes each option in detail and includes examples.&#x20;

To check which job options are mutable, see [Amazon Kinesis](/content/reference-1/sql-commands/connections/create-connection/amazon-kinesis.md).
{% endhint %}


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter, and the optional `goal` query parameter:

```
GET https://upsolver.gitbook.io/content/quickstarts-1/jobs/ingestion/stream-and-file-sources/amazon-kinesis.md?ask=<question>&goal=<endgoal>
```

`ask` is the immediate question: it should be specific, self-contained, and written in natural language.
`goal` is optional and describes the broader end goal you are ultimately trying to accomplish on behalf of the user. GitBook uses it to tailor the answer towards what is most useful for that goal.

The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
