> For the complete documentation index, see [llms.txt](https://upsolver.gitbook.io/content/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://upsolver.gitbook.io/content/reference-1/sql-commands/jobs/create-job/ingestion/amazon-s3.md).

# Amazon S3

## Syntax

{% code overflow="wrap" %}

```sql
CREATE [SYNC] JOB <job_name>
    [{ job_options }]
AS COPY FROM S3 <connection_identifier>
    [{ source_options }]
INTO { <table_identifier> 
         | SNOWFLAKE <connection_name>.<schema_name>.<table_name> }
[ WITH EXPECTATION <exp_name> EXPECT <sql_predicate> ON VIOLATION { DROP | WARN } ];
```

{% endcode %}

#### Jump to

* [Job options](#job-options)
* [Source options](#source-options)

## Job options

The following properties can be configured when ingesting data from Amazon S3.

To quickly and efficiently find data in Amazon S3, Upsolver needs to list the files in the specified bucket and prefixes. Upsolver utilizes the folder structure and naming, i.e. partitions, to optimize reading data before it is processed.&#x20;

When reading events, Upsolver organizes them on a timeline that it uses to synchronize between jobs to ensure data is joined and aggregated correctly. Therefore, storing raw data using date-formatted partitions such as `/bucket/data/2022/11/01/`, enables Upsolver to optimize and speed up ingestion.&#x20;

{% hint style="info" %}
To better understand how Upsolver reads and processes data using date partitions, see [Working with date patterns](#date_pattern).
{% endhint %}

{% code overflow="wrap" %}

```sql
[ COLUMN_TRANSFORMATIONS = (<column> = <expression>, ...) ]
[ COMMENT = '<comment>' ]
[ COMMIT_INTERVAL = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
[ COMPUTE_CLUSTER = <cluster_identifier> ]
[ CONTENT_TYPE = { AUTO 
                 | CSV 
                 | JSON 
                 | PARQUET 
                 | TSV 
                 | AVRO 
                 | AVRO_SCHEMA_REGISTRY 
                 | FIXED_WIDTH 
                 | REGEX 
                 | SPLIT_LINES 
                 | ORC 
                 | XML } ]
[ COMPRESSION = { AUTO 
                | GZIP 
                | SNAPPY 
                | LZO 
                | NONE 
                | SNAPPY_UNFRAMED 
                | KCL } ]
[ DATE_PATTERN = '<date_pattern>' ]                
[ DEDUPLICATE_WITH = ( {COLUMNS = (<col>, ...)  | COLUMN = <col>}, WINDOW = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]                
[ DELETE_FILES_AFTER_LOAD = { TRUE | FALSE } ]
[ END_AT = { NOW | <timestamp> } ]  
[ EXCLUDE_COLUMNS = ( <col>, ...) ]       
[ FILE_PATTERN = '<pattern>' ]
[ FILE_SUFFIX = '<suffix>' ]
[ INITIAL_LOAD_PATTERN = '<pattern>' ]
[ INITIAL_LOAD_PREFIX = '<prefix>' ]
[ MAX_DELAY = <integer> { MINUTE[S] | HOUR[S] | DAY[S] } ]
[ RUN_PARALLELISM = <integer> ]
[ SKIP_ALL_VALIDATIONS = { TRUE | FALSE } ]
[ SKIP_VALIDATIONS = ('EMPTY_PATH' | 'SKIP_SQS_RESOURCES_DELETION') ]
[ SQS_KMS_KEY = '<key_name>' ]
[ SQS_QUEUE_NAME = '<queue_name>' ]
[ START_FROM = { NOW | BEGINNING | <timestamp> } ]
[ USE_SQS_NOTIFICATIONS = { TRUE | FALSE } ]
```

{% endcode %}

#### Jump to

Amazon S3 job options:

* [`COMMIT_INTERVAL`](#commit_interval)
* [`COMPRESSION`](#compression)
* [`CONTENT_TYPE`](#content_type-editable)
* [`DATE_PATTERN`](#date_pattern)
* [`DEDUPLICATE_WITH`](#deduplicate_with)
* [`DELETE_FILES_AFTER_LOAD`](#delete_files_after_load)
* [`FILE_PATTERN`](#date_pattern)
* [`FILE_SUFFIX`](#file_suffix)
* [`INITIAL_LOAD_PATTERN`](#initial_load_pattern)
* [`INITIAL_LOAD_PREFIX`](#initial_load_prefix)
* [`RUN_PARALLELISM`](#run_parallelism-editable)
* [`SKIP_ALL_VALIDATIONS`](#skip_all_validations)
* [`SKIP_VALIDATIONS`](#skip_validations)
* [`SQS_KMS_KEY`](#sqs_kms_key)
* [`SQS_QUEUE_NAME`](#sqs_queue_name)
* [`START_FROM`](#start_from)
* [`USE_SQS_NOTIFICATIONS`](#use_sqs_notifications)

General job options:

* [`COLUMN_TRANSFORMATIONS`](/content/reference-1/sql-commands/jobs/create-job/ingestion.md#column_transformations)&#x20;
* [`COMMENT`](/content/reference-1/sql-commands/jobs/create-job/ingestion.md#comment-editable)&#x20;
* [`COMPUTE_CLUSTER`](/content/reference-1/sql-commands/jobs/create-job/ingestion.md#compute_cluster-editable)&#x20;
* [`END_AT`](/content/reference-1/sql-commands/jobs/create-job/ingestion.md#end_at-editable)&#x20;
* [`EXCLUDE_COLUMNS`](/content/reference-1/sql-commands/jobs/create-job/ingestion.md#exclude_columns)

See also:

* [`WITH EXPECTATION`](broken://spaces/j7NDvdvbkHS8wtX8aREE/pages/Y2QSVaw0n1NMmbDpVePG)

#### `COMMIT_INTERVAL`

Type: `<integer> { MINUTE[S] | HOUR[S] | DAY[S] }`

Default: [`WRITE_INTERVAL`](#write_interval)

(Optional) Defines how often the job will commit to Snowflake in a direct ingestion job. If empty, the `WRITE_INTERVAL` value will be used. The `COMMIT_INTERVAL` value must be bigger and divisible by `WRITE_INTERVAL`.

#### `COMPRESSION`

Values: `{ AUTO | GZIP | SNAPPY | LZO | NONE | SNAPPY_UNFRAMED | KCL }`

Default: `AUTO`

(Optional) The compression of the source.

#### `CONTENT_TYPE` — editable

Values: `{ AUTO | CSV | JSON | PARQUET | TSV | AVRO | AVRO_SCHEMA_REGISTRY | FIXED_WIDTH | REGEX | SPLIT_LINES | ORC | XML }`

Default: `AUTO`

(Optional) The file format of the content being read.

Note that `AUTO` only works when reading Avro, JSON, or Parquet.

To configure additional options for certain content types, see the [Content Types](/content/reference-1/sql-commands/jobs/create-job/ingestion/content-types.md) options.

#### `DATE_PATTERN`

Type: `text`

(Optional) The date pattern of the partitions on the Amazon S3 bucket to read from. Upsolver supports reading from buckets partitioned up to the minute.

Example: `'yyyy/MM/dd/HH/mm'`

For more options, see [Java SimpleDateFormat](https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html).

When you set a `DATE_PATTERN`, Upsolver uses the date in the folder path to understand when new files are added. The date in the path is used to process data in order of arrival, as well as set the `$source_time` and `$event_time` [system columns](/content/articles-1/data/system-columns.md) used to keep jobs [synchronized](/content/articles-1/get-started/understanding-sync-and-non-sync-jobs.md). If files are added to a folder named with a future date, these files will not be ingested until that date becomes the present.

If you don’t set a `DATE_PATTERN`, Upsolver will list and ingest files in the ingest job’s `BUCKET` and `PREFIX` location as soon as they are discovered. Historical data will also be processed as soon as it is added and discovered by Upsolver .&#x20;

To discover new files, when a `DATE_PATTERN` is not set, Upsolver lists the top-level prefix and performs a diff to detect newly added files. Subsequently, it lists the paths adjacent to these new files with the assumption that if a file was added here, others would be as well. This process is performed at regular intervals to ensure files are not missed.&#x20;

For buckets with few files and predictable changes, this works well. However, for buckets with many changes across millions of files and hundreds of prefixes, the scanning and diffing process may result in ingestion and processing delay. To optimize this process, consider setting the job’s [`DELETE_FILES_AFTER_LOAD`](#delete_files_after_load) property to `TRUE`. This moves ingested files to another staging location, leaving the source folder empty, and making it easier and faster for Upsolver to discover new files. Be aware that configuring Upsolver to move ingested files could impact other systems if they depend on the same raw files.

To troubleshoot jobs that ingest data, you can query the [task executions system table](/content/reference-1/monitoring/system-catalog/task-executions-table.md) and inspect whether 0 bytes of data have been read in the “ingest data” stage, or Upsolver is throwing parse errors in the “parse data” stage. In the case that 0 bytes have been read, it means that your job is configured correctly, but there is no new data. In the case where you see parse errors, you can narrow it down to either a misconfiguration of the job or bad data.

{% hint style="info" %}
For more information please read the article, Working with Date Patterns.
{% endhint %}

#### `DEDUPLICATE_WITH`&#x20;

Values: `( {COLUMNS = (, ...) | COLUMN = }, WINDOW = { MINUTE[S] | HOUR[S] | DAY[S] } )`&#x20;

(Optional) You can use `DEDUPLICATE_WITH` to prevent duplicate rows from arriving in your target. One or more columns can be supplied in the column list to act as a key so that all events within the timeframe specified in the `WINDOW` value are deduplicated.&#x20;

For example, if you have a third-party application that sends the same event multiple times a day, you can define one or more columns as the key and set the timeframe to be `1 DAY`. Upsolver will exclude all duplicate events that arrive within the day, ensuring your target only receives unique events.&#x20;

Note that if you have multiple jobs writing to a table in your lake, duplicate rows can be generated, even when you include this option.

#### `DELETE_FILES_AFTER_LOAD`

Type: `Boolean`

Default: `false`

(Optional) When `true`, files are deleted from the storage source once they have been ingested into the target location within your metastore.

This allows Upsolver to discover new files immediately, regardless of how many files are in the source, or what file names and patterns are used.

#### `FILE_PATTERN`

Type: `text`

Default: `''`

(Optional) Only files that match the provided regex pattern are loaded.

Use this option to filter out irrelevant data. For example, you could filter by a suffix to only keep `.parquet` files in a folder that may have some additional files that should not be ingested.

#### `FILE_SUFFIX`

Type: `text`

(Optional) This option is only applicable when [`USE_SQS_NOTIFICATIONS`](#use_sqs_notifications) is set to `true`. It controls which file suffixes S3 will publish notifications for.

#### `INITIAL_LOAD_PATTERN`

Type: `text`

(Optional) Any file matching this regex pattern is immediately loaded when the job is run.

This loads data separately from the date pattern and is primarily used in CDC use cases, where you load some initial files named `LOAD00001`, `LOAD00002`, etc. After that, all the data has a date pattern in the file name.

#### `INITIAL_LOAD_PREFIX`

Type: `text`

(Optional) Any file matching this prefix is immediately loaded when the job is run.

#### `MAX_DELAY`

Value: `<integer> { MINUTE[S] | HOUR[S] | DAY[S] }`

Default: `1 DAY`

(Optional) Configures how far back to check for new files. The larger this is, the more list operations are used.

#### `RUN_PARALLELISM` — editable

Type: `integer`

Default: `1`

(Optional) The number of parser jobs to run in parallel per minute.

#### `SKIP_ALL_VALIDATIONS`

Type: `Boolean`

Default: `false`

(Optional) If data is expected to arrive in the source at a later point in time, set this value to `true`.&#x20;

This option instructs Upsolver to ignore all validations to allow you to create a job that reads from a source that currently has no data.

#### `SKIP_VALIDATIONS`

Value: `EMPTY_PATH | SKIP_SQS_RESOURCES_DELETION`

When `SKIP_SQS_RESOURCES_DELETION` is specified, the job skips the check or validation related to the deletion of the SQS queue associated with the job when it is dropped. This means the job can be dropped even if the system encounters issues while trying to delete the SQS queue. If this parameter is not set (or set differently), and deleting the SQS queue fails while dropping the job, it may cause the drop process itself to fail.

When the job is recreated, it will create a new SQS queue. However, this can lead to issues when processing existing S3 files. Therefore, `SKIP_SQS_RESOURCES_DELETION` allows the job to continue with its drop process, even if there are problems with deleting the associated SQS queue. It doesn't prevent the deletion itself but ensures that it won't block the drop job process.

Value: `EMPTY_PATH | SKIP_SQS_RESOURCES_DELETION`

(Optional) Use this option if data is expected to arrive in the source at a later point in time.&#x20;

This option tells Upsolver to ignore specific validations to allow you to create a job that reads from a source that currently has no data.

When `SKIP_SQS_RESOURCES_DELETION` is specified, the job skips the check or validation related to the deletion of the SQS queue associated with the job when it is dropped. This means the job can be dropped even if the system encounters issues while trying to delete the SQS queue. If this parameter is not set (or set differently), and deleting the SQS queue fails while dropping the job, it may cause the drop process itself to fail.

When the job is recreated, it will create a new SQS queue. However, this can lead to issues when processing existing S3 files. Therefore, `SKIP_SQS_RESOURCES_DELETION` allows the job to continue with its drop process, even if there are problems with deleting the associated SQS queue. It doesn't prevent the deletion itself but ensures that it won't block the drop job process.

#### `SQS_KMS_KEY`

Type: `text`

(Optional) Specifies the KMS key to use for reading from the SQS queue. This is only required if the SQS queue is set up with encryption.

#### `SQS_QUEUE_NAME`

Type: `text`

(Optional) If set, Upsolver will read from this queue instead of creating a new queue and setting up notification settings. Use this option when you already have a notification queue or need to use an existing queue since AWS can only publish notifications to a single target. A common pattern is to publish to SNS and then forward from SNS to SQS.

#### `START_FROM`

Values: `{ NOW | BEGINNING | <timestamp> }`

Default: `BEGINNING`

(Optional) Configures the time to start ingesting data. Files before the specified time are ignored. Timestamps should be in the UTC time format.

When a [`DATE_PATTERN`](#date_pattern) is not specified, configuring this option is not allowed. By default, all available data is ingested.

If the [`DATE_PATTERN`](#date_pattern) is not lexicographically ordered, then this option cannot be set to `BEGINNING`.

#### `USE_SQS_NOTIFICATIONS`

Type: `Boolean`

Default: `false`

(Optional) When enabled, Upsolver will use SQS notifications sent by the S3 bucket to discover new files. This is particularly useful when the data in the directory being read doesn't have a predictable pattern that can be used to discover files efficiently. If the SQS\_QUEUE\_NAME isn't set explicitly Upsolver will create an SQS queue and set up bucket notifications to send messages to the queue.

Note: Setting up the queue and notifications requires the appropriate permissions in AWS. In addition, S3 bucket notifications are limited to a single target per prefix + suffix combination. If you already have notifications set up for the required files you will need to provide the queue to use manually or remove the existing notification settings.

## Source options

#### `LOCATION`

Type: `text`

The location to read files from, as a full Amazon S3 URI.&#x20;

## Examples

### Ingest data into the data lake

The example below creates a sync job to ingest data from Amazon S3 into a staging table that has been created in the data lake. The `CONTENT_TYPE` option specifies that the data to be ingested is in JSON format.&#x20;

```sql
CREATE SYNC JOB load_orders_raw_data_from_s3
   CONTENT_TYPE = JSON
AS COPY FROM S3 upsolver_s3_samples 
   LOCATION = 's3://upsolver-samples/orders/' 
INTO default_glue_catalog.upsolver_samples.orders_raw_data;
```

After the data has been ingested into the staging table, you can create a [transformation](/content/reference-1/sql-commands/jobs/create-job/transformation.md) job to load the data into your target.

### Ingest data directly into Snowflake

Data can be ingested directly to Snowflake, removing the need to create staging tables in the lake and write additional transformation jobs to load the data. Ingestion job options provide numerous functions to enable transformations and data quality observability, ensuring that your data lands in the target in the required state, regardless of the source.

In the following example, a sync job is created to ingest data from Amazon S3 directly to Snowflake, using transformations and expectations to ensure the data meets the required standards when it lands in the target table.    &#x20;

The `DEDUPLICATE_WITH` option tells the job to ensure that duplicate events arriving within an hour of each other are not duplicated on the target. The job uses the **orderid** column to uniquely identify duplicates. The UPPER string function is applied to the **ordertype** column using the `COLUMN_TRANSFORMATIONS` option so that all values in this column land in the target in upper case, and not in mixed case. &#x20;

Two expectations have been applied to this job: the first, **exp\_orderid**, tells the job to drop any rows from the stream that have a `NULL` value for the **orderid**. Rows that violate this expectation do not reach the target but are counted in the job monitoring tables. The second expectation, **exp\_shippinginfo**, checks that the shipping method is either **express** or **regular**. If this condition is violated, the rows are loaded into the target, and the job monitoring tables count each row that raises a warning so you can observe your data quality and take necessary action.&#x20;

```sql
CREATE SYNC JOB ingest_s3_to_snowflake
   COMMENT = 'Load orders into Snowflake'
   CONTENT_TYPE = JSON
   DEDUPLICATE_WITH = (COLUMNS = (orderid), WINDOW = 1 hour)
   COLUMN_TRANSFORMATIONS = (ordertype = UPPER(ordertype)) 
   COMMIT_INTERVAL = 5 MINUTES
AS COPY FROM S3 upsolver_s3_samples 
   LOCATION = 's3://upsolver-samples/orders/' 
INTO SNOWFLAKE my_snowflake_connection.demo.orders_transformed
   WITH EXPECTATION exp_orderid 
      EXPECT orderid IS NOT NULL 
      ON VIOLATION DROP
   WITH EXPECTATION exp_shipping 
      EXPECT shippinginfo.method IN ('express', 'regular') 
      ON VIOLATION WARN;
```

### Ingest data over SQS

In the following example, data is loaded from Amazon S3 to the data lake over SQS:

```sql
CREATE SYNC JOB load_s3_over_sqs 
   USE_SQS_NOTIFICATIONS = TRUE 
   SKIP_VALIDATIONS = ('SKIP_SQS_RESOURCES_DELETION') 
   CONTENT_TYPE = JSON 
AS COPY FROM S3 my_s3_connection 
   LOCATION = 's3://sqsnotification/sales-event-change/' 
INTO default_glue__catalog.upsolver_samples.sales_raw_data; 
```


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter, and the optional `goal` query parameter:

```
GET https://upsolver.gitbook.io/content/reference-1/sql-commands/jobs/create-job/ingestion/amazon-s3.md?ask=<question>&goal=<endgoal>
```

`ask` is the immediate question: it should be specific, self-contained, and written in natural language.
`goal` is optional and describes the broader end goal you are ultimately trying to accomplish on behalf of the user. GitBook uses it to tailor the answer towards what is most useful for that goal.

The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
