> For the complete documentation index, see [llms.txt](https://upsolver.gitbook.io/content/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://upsolver.gitbook.io/content/articles-1/get-started/core-concepts/upsolver-timeline.md).

# Upsolver Timeline

When working with your data in Upsolver and in order to ensure you achieve your intended results, it is important to understand how the data is handled, particularly in relation to time.

[Data ingestion](#ispcjx46bo76): How Upsolver parses data over time and how the event time is derived from your data

[Data transformation](#gi943upwsbzr): How Upsolver processes your data transformations over time and how to ensure you get the intended results when aggregating your data

### Data ingestion <a href="#ispcjx46bo76" id="ispcjx46bo76"></a>

After you create an ingestion job that writes to your staging table in Upsolver, for every minute between the times specified to `START_FROM` and `END_AT`, that job first runs an ingestion task to either discover the data (for cloud storage sources) or read the raw bytes (for streaming system sources). It then executes a task to parse the data that had arrived within that minute.

{% hint style="info" %}
Note that the parsing task can be parallelized while the ingestion task can not.
{% endhint %}

<figure><img src="/files/tWg8IuY7TyvA6d8eS7FY" alt=""><figcaption></figcaption></figure>

Each block within the diagram above represents data from a single minute. For example, if `START_FROM` is set to `timestamp '2022-08-04 11:00:00.000',` then block 1 would be data from `2022-08-04 11:00` to `2022-08-04 11:01`.

The data from each block is handled by multiple tasks that depend on each other before being written to your staging table. These tasks are the various stages denoted in the [`stage_name`](broken://spaces/WKMq8oT1OPM3KjP8vlg2/pages/Buv5VsNjm00vP5T8WEGJ#stage-names) column of the [`task_executions`](/content/reference-1/monitoring/system-catalog/task-executions-table.md) table.

If you check the tasks related to your job within the `task_executions` table, you can also see which minute of data a specific task is processing based on the `task_start_time` and `task_end` time. Following our earlier example, a task that worked on block 1 would have a `task_start_time` of `2022-08-04 11:00:00.000` and a `task_end_time` of `2022-08-04 11:01:00.000`.

The `COPY FROM` job continues to ingest the data from each block until it reaches the `END_AT` time. If there is no `END_AT` time configured for the job, then data ingestion continues indefinitely as long as new events are arriving.

### Commit time and event time <a href="#id-2oy8hgqvochv" id="id-2oy8hgqvochv"></a>

As your data is being written to the staging table, the time at which the data becomes visible within the table is stored as the `$commit_time`.

There are also two system columns `$event_date` and `$event_timestamp` that are added to the table based on the original event time derived from the source.

When copying from a storage-type source (e.g. Amazon S3), if the `DATE_PATTERN` job option is configured, then the date pattern in the file is used. Otherwise, the event time is determined based on the file's modified time.

{% hint style="info" %}
Note that the events being read by any given task are based on the event time falling within that task's particular `task_start_time` and `task_end_time` window.
{% endhint %}

In the case of a streaming-type source (e.g. Kafka or Kinesis), the processing timestamp from the source is used as long as it is after the previous timestamps in the stream and before the current time.

If the event's timestamp is after the current time, it is not ingested until that time arrives, when it gets processed by a task with a `task_start_time` and `task_end_time` window containing that event's time. If the timestamp happens to be before a previous timestamp in the stream, then either that previous timestamp or the `task_start_time` is used, depending on whichever is later.

### Data transformation <a href="#gi943upwsbzr" id="gi943upwsbzr"></a>

Transformation jobs operate similarly to data ingestion jobs, with some added flexibility to the job's run time window.

By default, the job option `RUN_INTERVAL` for transformation jobs is set to 1 minute, meaning that just like ingestion jobs, tasks are run every minute to process and write the data from between that minute.

However, if your use case does not require data to be written to the target location so frequently, you can set `RUN_INTERVAL` to a longer time period (e.g. `RUN_INTERVAL` could be `5 MINUTES`, `1 HOUR`, `7 DAYS`, etc.).

{% hint style="info" %}
Note that for any `RUN_INTERVAL` that's less than a day, the given value should evenly divide 1 day (24 hours). For example, you can set `RUN_INTERVAL` to 2 hours (the job runs 12 times per day), but trying to set `RUN_INTERVAL` to 5 hours would fail since 24 hours is not evenly divisible by 5.
{% endhint %}

<figure><img src="/files/XnZV5AuoTOXS38fODoTG" alt=""><figcaption></figcaption></figure>

Going back to this diagram, if you set `RUN_INTERVAL = 5 MINUTES,` then each block here now represents data from a 5 minute period.

Say you created the job with `START_FROM = NOW` and the current time is `2022-08-05 11:03`; in order to align with the `RUN_INTERVAL` of 5 minutes, the task for block 1 processes data from `2022-08-05 11:03` up until `2022-08-05 11:05`.

Similarly, if you set `END_AT = NOW` and the current time is `2022-08-05 11:03`, then the task for block n processes data from `2022-08-05 10:55` up until `2022-08-05 11:00`.

If there is no `END_AT` time configured for the job, however, then your data continues to be processed indefinitely as long as there are new events being written to the staging table.

As you can see, the `RUN_INTERVAL` is essentially what determines the `run_start_time()` and `run_end_time()` seen within the required filter `WHERE $commit_time BETWEEN run_start_time() AND run_end_time()`.

{% hint style="info" %}
Note that `run_start_time()` and `run_end_time()` are equivalent to what you see for `task_start_time` and `task_end_time` respectively within the [`task_executions`](/content/reference-1/monitoring/system-catalog/task-executions-table.md) table.
{% endhint %}

Then, based on the `WHERE` filter, each task only processes the data that was committed to the source table between the given time range. Understanding how the `RUN_INTERVAL` and this `WHERE` filter interacts are key to ensuring that you get the expected results when creating aggregate jobs.

### Aggregating data <a href="#id-1zfva24dpx6r" id="id-1zfva24dpx6r"></a>

When creating an aggregation, there are two options to define the time interval over which the data is aggregated:

1. Adjust the `WHERE` filter to the desired time range by subtracting the proper time interval (e.g. `WHERE $commit_time BETWEEN run_start_time() - PARSE_DURATION('1d') AND run_end_time()`)
2. Adjust the `RUN_INTERVAL` to the appropriate time interval (e.g. `RUN_INTERVAL = 1 DAY`)

To better understand the difference between these two methods, let's walk through an example using both.

In this scenario, let's say that we would like to calculate the average order value (AOV) per country for the past 30 days.

#### Case 1 — Sliding window <a href="#id-1w8emtqb61q" id="id-1w8emtqb61q"></a>

With the first option, we adjust the period that is aggregated over by subtracting a time interval in the `WHERE` filter:

```sql
CREATE JOB agg1
    RUN_INTERVAL = 1 MINUTE -- this is also the default
AS INSERT INTO default_glue_catalog.orders.monthly_aov 
  MAP_COLUMNS_BY_NAME
    SELECT country,
           avg(nettotal) as avg_order_value
    FROM default_glue_catalog.upsolver_samples.orders_raw_data
    WHERE $commit_time BETWEEN run_start_time() - PARSE_DURATION('30d') 
           AND run_end_time()
    GROUP BY country;
```

Keeping `RUN_INTERVAL` set to **1 minute** means that this job executes a new task every minute, and the subtracted `PARSE_DURATION('30d')` means that the last 30 days of data are not filtered out from this task.

<figure><img src="/files/qVPffA8Kwrga7lHlDj98" alt=""><figcaption></figcaption></figure>

If we look at what this means in a diagram, each numbered block above represents the `RUN_INTERVAL`, or the time range of data that would be typically processed by a given task if we had not subtracted a time interval.

However, with the `PARSE_DURATION('30d')` included, that means each task also processes data from the time range of one of the colored blocks. For example, subtracting 30 days means that the task for block 1 would be working with data from the minute represented by that block as well as data from block a.

Notice that block a itself represents 30 days and ends before block 1 starts; this is because we are subtracting the time interval from `run_start_time()` instead of `run_end_time()`. This also means that in reality, this job is actually executing a new task every minute that aggregates the data from the last 30 days and 1 minute.

{% hint style="info" %}
Note that this doesn't mean that Upsolver rescans the entire window of data each time; the aggregation for every new block is incrementally simply rolled up with the previously aggregated values from the time window.
{% endhint %}

#### Case 2 — Tumbling window <a href="#ufm8tv2gxn3f" id="ufm8tv2gxn3f"></a>

Let's say that we don't need the aggregated results to be written so frequently to our target location—just once every 30 days is enough. In that case, we can do this instead:

```sql
CREATE JOB agg2
    RUN_INTERVAL = 30 DAYS
AS INSERT INTO default_glue_catalog.orders.monthly_aov 
  MAP_COLUMNS_BY_NAME
    SELECT country,
           avg(nettotal) as avg_order_value
    FROM default_glue_catalog.upsolver_samples.orders_raw_data
    WHERE $commit_time BETWEEN run_start_time() AND run_end_time()
    GROUP BY country;
```

By setting `RUN_INTERVAL = 30 DAYS`, the `run_start_time()` and `run_end_time()` for each task are automatically adjusted to be 30 days apart. Changing the `RUN_INTERVAL` also means that a new task is only executed by this job every 30 days.

<figure><img src="/files/D1tuLGeYpj5yzbDaCSRJ" alt=""><figcaption></figcaption></figure>

Within our diagram, this means that each block now represents 30 days of data. For example, if we set `START_FROM = timestamp '2022-01-01 00:00:00'`, then the task processing block 1 would start from `2022-01-01` and end at `2022-01-30`. This also means that the next task for block 2 wouldn't start until `2022-01-31`.

Thus, the `INSERT` job above would write the aggregated average order value for the last 30 days to the target table once every 30 days.

#### Case 3 <a href="#cdudwloop3k0" id="cdudwloop3k0"></a>

What if we wanted new results every 30 days but we also wanted to see the aggregated values for the last 90 days?

To do this, we can combine elements of both methods shown above by both adjusting the `RUN_INTERVAL` and subtracting a time interval within the `WHERE` filter.

```sql
CREATE JOB agg3
    RUN_INTERVAL = 30 DAYS
AS INSERT INTO default_glue_catalog.orders.monthly_aov 
  MAP_COLUMNS_BY_NAME
    SELECT country,
           avg(nettotal) as avg_order_value
    FROM default_glue_catalog.upsolver_samples.orders_raw_data
    WHERE $commit_time BETWEEN run_start_time() - PARSE_DURATION('60d') 
           AND run_end_time()
    GROUP BY country;
```

By definition, `RUN_INTERVAL` of 30 days means that the job writes new results for this aggregation every 30 days. Since the `RUN_INTERVAL` also determines the time difference between `run_start_time()` and `run_end_time()`, in order to get the desired data for the last 90 days, we subtract an additional 60 days from `run_start_time()`.

<figure><img src="/files/h6G99ZsqqZ6xYpXHtKWu" alt=""><figcaption></figcaption></figure>

We can see this within the diagram where each of the numbered blocks represents a 30-day time period corresponding to the `RUN_INTERVAL` and each of the colored blocks represents a 60-day time period corresponding to the subtracted time interval.

When the task for block `1` executes, it aggregates all the data from block `a` in addition to the data from block `1` – a total of a 90-day period. Similarly, the task for block `2` will aggregate over the data from block `b` and block `2`.


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter, and the optional `goal` query parameter:

```
GET https://upsolver.gitbook.io/content/articles-1/get-started/core-concepts/upsolver-timeline.md?ask=<question>&goal=<endgoal>
```

`ask` is the immediate question: it should be specific, self-contained, and written in natural language.
`goal` is optional and describes the broader end goal you are ultimately trying to accomplish on behalf of the user. GitBook uses it to tailor the answer towards what is most useful for that goal.

The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
