Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...



Refer to __hoodie-client/src/test/java/HoodieClientExample.java__ class for an example of how compaction
is scheduled and executed.

Deployment ModelsMode

These are typical Hudi Writer and Compaction deployment models
* `Inline Compaction`
Inline Compaction : At each round, a single spark application ingests new batch to dataset. It then optionally decides to schedule
a compaction run and executes it in sequence.
* `Single Single Dedicated Async Compactor` Compactor: The Spark application which brings in new changes to dataset (writer) periodically
schedules compaction. The Writer application does not run compaction inline. A separate spark applications periodically
probes for pending compaction and executes the compaction.
* ` Multi Async Compactors` Compactors: This mode is similar to `Single Dedicated Async Compactor` mode. The main difference being
now there can be more than one spark application picking different compactions and executing them in parallel.
In order to ensure compactors do not step on each other, they use coordination service like zookeeper to pickup unique
pending compaction instants and run them.

The Compaction process requires one executor per file-slice in the compaction plan. So, the best resource allocation
strategy (both in terms of speed and resource usage) for clusters supporting dynamic allocation is to lookup the compaction
plan to be run to figure out the number of file slices being compacted and choose that many number of executors.

## Async Compaction Design Deep-Dive (Optional)Dive 

For the purpose of this section, it is important to distinguish between 2 types of commits as pertaining to the file-group:

A commit which generates a merged and read-optimized file-slice is called `snapshot commit` (SC) with respect to that file-group.
A  A commit which merely appended the new/updated records assigned to the file-group into a new log block is called `delta commit` (DC)
with respect to that file-group.

### Algorithm

The algorithm is described with an illustration. Let us assume a scenario where there are commits SC1, DC2, DC3 that have
already completed on a data-set. Commit DC4 is currently ongoing with the writer (ingestion) process using it to upsert data.
Let us also imagine there are a set of file-groups (FG1 … FGn) in the data-set whose latest version (`File-Slice`)
contains the base file created by commit SC1 (snapshot-commit in columnar format) and a log file containing row-based
log blocks of 2 delta-commits (DC2 and DC3).

{% include image.html file="async_compac_1.png" alt="async_compac_1.png" max-width="1000" %}

* Writer (Ingestion) that is going to commit "DC4" starts. The record updates in this batch are grouped by file-groups
and appended in row formats to the corresponding log file as delta commit. Let us imagine a subset of file-groups has
this new log block (delta commit) DC4 added.
* Before the writer job completes, it runs the compaction strategy to decide which file-group to compact by compactor
and creates a new compaction-request commit SC5. This commit file is marked as “requested” with metadata denoting
which fileIds to compact (based on selection policy). Writer completes without running compaction (will be run async).

{% include image.html file="async_compac_2.png" alt="async_compac_2.png" max-width="1000" %}

* Writer job runs again ingesting next batch. It starts with commit DC6. It reads the earliest inflight compaction
request marker commit in timeline order and collects the (fileId, Compaction Commit Id “CcId” ) pairs from meta-data.
Ingestion DC6 ensures a new file-slice with base-commit “CcId” gets allocated for the file-group.
The Writer will simply append records in row-format to the first log-file (as delta-commit) assuming the
base-file (“Phantom-Base-File”) will be created eventually by the compactor.

{% include image.html file="async_compac_3.png" alt="async_compac_3.png" max-width="1000" %}

* Compactor runs at some time and commits at “Tc” (concurrently or before/after Ingestion DC6). It reads the commit-timeline
and finds the first unprocessed compaction request marker commit. Compactor reads the commit’s metadata finding the
file-slices to be compacted. It compacts the file-slice and creates the missing base-file (“Phantom-Base-File”)
with “CCId” as the commit-timestamp. Compactor then marks the compaction commit timestamp as completed.
It is important to realize that at data-set level, there could be different file-groups requesting compaction at
different commit timestamps.

{% include image.html file="async_compac_4.png" alt="async_compac_4.png" max-width="1000" %}

* Near Real-time reader interested in getting the latest snapshot will have 2 cases. Let us assume that the
incremental ingestion (writer at DC6) happened before the compaction (some time “Tc”’).
The below description is with regards to compaction from file-group perspective.
* `Reader querying at time between ingestion completion time for DC6 and compaction finish “Tc”`:
Hudi’s implementation will be changed to become aware of file-groups currently waiting for compaction and
merge log-files corresponding to DC2-DC6 with the base-file corresponding to SC1. In essence, Hudi will create
a pseudo file-slice by combining the 2 file-slices starting at base-commits SC1 and SC5 to one.
For file-groups not waiting for compaction, the reader behavior is essentially the same - read latest file-slice
and merge on the fly.
* `Reader querying at time after compaction finished (> “Tc”)` : In this case, reader will not find any pending
compactions in the timeline and will simply have the current behavior of reading the latest file-slice and
merging on-the-fly.

* Read-Optimized View readers will query against the latest columnar base-file for each file-groups.

The above algorithm explains Async compaction w.r.t a single compaction run on a single file-group. It is important
to note that multiple compaction plans can be run concurrently as they are essentially operating on different
file-groups.