Skip to content

Commit

Permalink
[DOCS] Update timeline and table types pages (#9291)
Browse files Browse the repository at this point in the history
- Add local configs
- Keep uptodate on different query types
  • Loading branch information
bhasudha authored Aug 3, 2023
1 parent 17666d4 commit 23f2666
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 14 deletions.
50 changes: 43 additions & 7 deletions website/docs/table_types.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@
title: Table & Query Types
summary: "In this page, we describe the different tables types in Hudi."
toc: true
toc_min_heading_level: 2
toc_max_heading_level: 4
last_modified_at:
---

## Table and Query Types
Hudi table types define how data is indexed & laid out on the DFS and how the above primitives and timeline activities are implemented on top of such organization (i.e how data is written).
In turn, `query types` define how the underlying data is exposed to the queries (i.e how data is read).

| Table Type | Supported Query types |
|-------------- |------------------|
| Copy On Write | Snapshot Queries + Incremental Queries |
| Merge On Read | Snapshot Queries + Incremental Queries + Read Optimized Queries |
| Table Type | Supported Query types |
|-------------- |-------------------------------------------------------------------------------------------------------------------------------------------------|
| Copy On Write | <ul><li>Snapshot Queries</li><li>Incremental Queries</li><li>Incremental Queries (CDC)</li><li>Time Travel</li></ul> |
| Merge On Read | <ul><li>Snapshot Queries</li><li>Incremental Queries</li><li>Read Optimized Queries</li><li>Time Travel</li></ul> |

### Table Types
Hudi supports the following table types.
Expand All @@ -36,17 +38,25 @@ Hudi supports the following query types

- **Snapshot Queries** : Queries see the latest snapshot of the table as of a given commit or compaction action. In case of merge on read table, it exposes near-real time data(few mins) by merging
the base and delta files of the latest file slice on-the-fly. For copy on write table, it provides a drop-in replacement for existing parquet tables, while providing upsert/delete and other write side features.
- **Incremental Queries** : Queries only see new data written to the table, since a given commit/compaction. This effectively provides change streams to enable incremental data pipelines.
- **Read Optimized Queries** : Queries see the latest snapshot of table as of a given commit/compaction action. Exposes only the base/columnar files in latest file slices and guarantees the
- **Incremental Queries** : Queries only see new data written to the table, since a given commit/compaction.
This effectively provides change streams to enable incremental data pipelines. By default, this produces the latest
snapshot of the changes since a given point in timeline.
- ***Incremental Queries(CDC)*** : These are a subtype fo Incremental queries, where queries see all changed data since
a given commit/compaction as opposed to latest state of changed data. This enables full cdc style query use cases
allowing to see before and after images of the changes along with operations that caused the change.
- **Read Optimized Queries** : Queries see the latest snapshot of table as of a given commit/compaction action. Exposes only the base/columnar files in the latest file slices and guarantees the
same columnar query performance compared to a non-hudi columnar table.
- **Time Travel Queries** : Queries the snapshot of a table as of a given timestamp in the timeline.

Following table summarizes the trade-offs between the different query types.

Following table summarizes the trade-offs between the Snapshot and Read Optimized query types.

| Trade-off | Snapshot | Read Optimized |
|-------------- |-------------| ------------------|
| Data Latency | Lower | Higher
| Query Latency | Higher (merge base / columnar file + row based delta / log files) | Lower (raw base / columnar file performance)

For configs related to query types refer [below](#query-configs).

## Copy On Write Table

Expand Down Expand Up @@ -108,3 +118,29 @@ The intention of merge on read table is to enable near real-time processing dire
data out to specialized systems, which may not be able to handle the data volume. There are also a few secondary side benefits to
this table such as reduced write amplification by avoiding synchronous merge of data, i.e, the amount of data written per 1 bytes of data in a batch

## Query configs
Following are the configs relevant to different query types.

### Spark configs

| Config Name | Default | Description |
|----------------------------------------------------------------------------------------|---------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| hoodie.datasource.query.type | snapshot (Optional) | Whether data needs to be read, in `incremental` mode (new data since an instantTime) (or) `read_optimized` mode (obtain latest view, based on base files) (or) `snapshot` mode (obtain latest view, by merging base and (if any) log files)<br /><br />`Config Param: QUERY_TYPE` |
| hoodie.datasource.read.begin.instanttime | N/A **(Required)** | Required when `hoodie.datasource.query.type` is set to `incremental`. Represents the instant time to start incrementally pulling data from. The instanttime here need not necessarily correspond to an instant on the timeline. New data written with an instant_time &gt; BEGIN_INSTANTTIME are fetched out. For e.g: ‘20170901080000’ will get all new data written after Sep 1, 2017 08:00AM. Note that if `hoodie.datasource.read.handle.hollow.commit` set to USE_STATE_TRANSITION_TIME, will use instant's `stateTransitionTime` to perform comparison.<br /><br />`Config Param: BEGIN_INSTANTTIME` |
| hoodie.datasource.read.end.instanttime | N/A **(Required)** | Used when `hoodie.datasource.query.type` is set to `incremental`. Represents the instant time to limit incrementally fetched data to. When not specified latest commit time from timeline is assumed by default. When specified, new data written with an instant_time &lt;= END_INSTANTTIME are fetched out. Point in time type queries make more sense with begin and end instant times specified. Note that if `hoodie.datasource.read.handle.hollow.commit` set to `USE_STATE_TRANSITION_TIME`, will use instant's `stateTransitionTime` to perform comparison.<br /><br />`Config Param: END_INSTANTTIME` |
| hoodie.datasource.query.incremental.format | latest_state (Optional) | This config is used alone with the 'incremental' query type.When set to 'latest_state', it returns the latest records' values.When set to 'cdc', it returns the cdc data.<br /><br />`Config Param: INCREMENTAL_FORMAT`<br />`Since Version: 0.13.0` |
| as.of.instant | N/A **(Required)** | The query instant for time travel. Required only in the context of time travel queries. Without specified this option, we query the latest snapshot.<br /><br />`Config Param: TIME_TRAVEL_AS_OF_INSTANT` |


Refer [here](https://hudi.apache.org/docs/next/configurations#Read-Options) for more details

### Flink Configs

| Config Name | Default | Description |
|------------------------------------------------------------------------------------------|---------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| hoodie.datasource.query.type | snapshot (Optional) | Decides how data files need to be read, in 1) Snapshot mode (obtain latest view, based on row &amp; columnar data); 2) incremental mode (new data since an instantTime). If `cdc.enabled` is set incremental queries on cdc data are possible; 3) Read Optimized mode (obtain latest view, based on columnar data) .Default: snapshot<br /><br /> `Config Param: QUERY_TYPE` |
| read.start-commit | N/A **(Required)** | Required in case of incremental queries. Start commit instant for reading, the commit time format should be 'yyyyMMddHHmmss', by default reading from the latest instant for streaming read<br /><br /> `Config Param: READ_START_COMMIT` |
| read.end-commit | N/A **(Required)** | Used int he context of incremental queries. End commit instant for reading, the commit time format should be 'yyyyMMddHHmmss'<br /><br /> `Config Param: READ_END_COMMIT` |

Refer [here](https://hudi.apache.org/docs/next/configurations#Flink-Options) for more details.

45 changes: 41 additions & 4 deletions website/docs/timeline.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
---
title: Timeline
toc: true
toc_min_heading_level: 2
toc_max_heading_level: 4
---

## Timeline
At its core, Hudi maintains a `timeline` of all actions performed on the table at different `instants` of time that helps provide instantaneous views of the table,
At its core, Hudi maintains a `timeline` which is a log of all actions performed on the table at different `instants` of time that helps provide instantaneous views of the table,
while also efficiently supporting retrieval of data in the order of arrival. A Hudi instant consists of the following components

* `Instant action` : Type of action performed on the table
* `Instant time` : Instant time is typically a timestamp (e.g: 20190117010349), which monotonically increases in the order of action's begin time.
* `state` : current state of the instant

Hudi guarantees that the actions performed on the timeline are atomic & timeline consistent based on the instant time.
Hudi guarantees that the actions performed on the timeline are atomic & timeline consistent based on the instant time.
Atomicity is achieved by relying on the atomic puts to the underlying storage to move the write operations through various states in the timeline.
This is achieved on the underlying DFS (in the case of S3/Cloud Storage, by an atomic PUT operation) and can be observed by files of the pattern `<instant>.<action>.<state>` in Hudi’s timeline.

### Actions
Key actions performed include

* `COMMITS` - A commit denotes an **atomic write** of a batch of records into a table.
Expand All @@ -22,6 +26,7 @@ Key actions performed include
* `ROLLBACK` - Indicates that a commit/delta commit was unsuccessful & rolled back, removing any partial files produced during such a write
* `SAVEPOINT` - Marks certain file groups as "saved", such that cleaner will not delete them. It helps restore the table to a point on the timeline, in case of disaster/data recovery scenarios.

### States
Any given instant can be
in one of the following states

Expand All @@ -39,4 +44,36 @@ organization reflects the actual time or `event time`, the data was intended for

When there is late arriving data (data intended for 9:00 arriving >1 hr late at 10:20), we can see the upsert producing new data into even older time buckets/folders.
With the help of the timeline, an incremental query attempting to get all new data that was committed successfully since 10:00 hours, is able to very efficiently consume
only the changed files without say scanning all the time buckets > 07:00.
only the changed files without say scanning all the time buckets > 07:00.

### Active and Archived timeline
Hudi divides the entire timeline into active and archived timeline. As the name suggests active timeline is consulted all
the time to serve metadata on valid data files and to ensure reads on the timeline does not incur unnecessary latencies
as timeline grows, the active timeline needs to be bounded on the metadata (timeline instants) it can serve. To ensure this,
after certain thresholds the archival kicks in to move older timeline events to the archived timeline. In general, archival
timeline is never contacted for regular operations of the table and is merely used for book-keeping and debugging purposes.
Any instants seen under “.hoodie” directory refers to active timeline and those archived goes into “.hoodie/archived” folder.


### Archival Configs
Basic configurations that control archival.

#### Spark write client configs

| Config Name | Default | Description |
|--------------------------------------------------------------------------------------------| ------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| hoodie.keep.max.commits | 30 (Optional) | Archiving service moves older entries from timeline into an archived log after each write, to keep the metadata overhead constant, even as the table size grows. This config controls the maximum number of instants to retain in the active timeline. |
| hoodie.keep.min.commits | 20 (Optional) | Similar to hoodie.keep.max.commits, but controls the minimum number of instants to retain in the active timeline. |

For more advanced configs refer [here](https://hudi.apache.org/docs/next/configurations#Archival-Configs-advanced-configs).

#### Flink Options
Flink jobs using the SQL can be configured through the options in WITH clause. The actual datasource level configs are listed below.

| Config Name | Default | Description |
| ---------------------------------------------------------------------------------------------- | --------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| archive.max_commits | 50 (Optional) | Max number of commits to keep before archiving older commits into a sequential log, default 50<br /><br /> `Config Param: ARCHIVE_MAX_COMMITS` |
| archive.min_commits | 40 (Optional) | Min number of commits to keep before archiving older commits into a sequential log, default 40<br /><br /> `Config Param: ARCHIVE_MIN_COMMITS` |

Refer [here](https://hudi.apache.org/docs/next/configurations#Flink-Options) for more details.

4 changes: 2 additions & 2 deletions website/src/css/custom.css
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ h1.blogPostTitle_src-theme-BlogPostItem-styles-module{
}

.docs-custom-styles tr td:nth-child(1) {
width: 20%;
width: 25%;
word-break: break-all;
}

Expand All @@ -361,7 +361,7 @@ h1.blogPostTitle_src-theme-BlogPostItem-styles-module{
}

.docs-custom-styles tr td:nth-child(3) {
width: 65%;
width: 60%;
}

.docs-custom-styles th {
Expand Down
2 changes: 1 addition & 1 deletion website/src/theme/DocPage/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ function DocPageContent({
);
}

const arrayOfPages = (matchPath) => [`${matchPath}/configurations`, `${matchPath}/basic_configurations`];
const arrayOfPages = (matchPath) => [`${matchPath}/configurations`, `${matchPath}/basic_configurations`, `${matchPath}/timeline`, `${matchPath}/table_types`];
const showCustomStylesForDocs = (matchPath, pathname) => arrayOfPages(matchPath).includes(pathname);
function DocPage(props) {
const {
Expand Down

0 comments on commit 23f2666

Please sign in to comment.