Compaction is a def~instant-action, that takes as input a set of def~file-slices, merges all the def~log-files, in each file slice against its def~base-file, to produce a new compacted file slices, written as a def~commit on the def~timeline.  Compaction is only applicable for the def~merge-on-read (MOR) table type and what file slices are chosen for compaction is determined by a def-compaction-policy (default: chooses the file slice with maximum sized uncompacted log files) that is evaluated after each def~write-operation.

At a high level, there are two styles of compaction 

  • Synchronous compaction : Here the compaction is performed by the writer process itself synchronously after each write i.e the next write operation cannot begin until compaction finishes. This is the simplest, in terms of operation since no separate compaction process needs to be scheduled, but has lower data freshness guarantees. However, this style is still very useful in cases where say it's possible to compact the most recent def~table-partitions every write operation, while delaying the compaction on late-arriving/older partitions.
  • Asynchronous compaction : In this style, compaction process can run concurrently and asynchronously with the def~write-operation on the def~table. This has the obvious benefits of compaction not blocking the next batch of writes, yielding near-real time data freshness. Tools like Hudi DeltaStreamer support a convenient continuous mode, where compaction and write operations happen in this fashion within a single spark runtime cluster.

Design 

todo

How to Enable a Compaction Strategy

Compaction Strategy are enabled using the HoodieWriteConfig. A new strategy can be chosen any time a new HoodieWriteConfig is created.


DayBasedCompactionStrategy strategy = new DayBasedCompactionStrategy();
strategy.withTargetPartitionsPerDayBasedCompaction(1);
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withCompactionStrategy(strategy).build(); 


Types of Compaction Strategies

For the examples below assume today's date is 2020/03/11 and the partitions having FileSlices for compaction are 2020/03/13, 2020/03/12, 2020/03/11, 2020/03/10, 2020/03/09, 2020/03/08, 2020/03/07.


NameDescriptionConfigurationExample (which File-Slices will be chosen for compaction)
DayBasedCompactionStrategyThis strategy orders compactions in reverse order of creation of partitions. It therefore compacts data in latest
partitions first.
HoodieComapctionConfig.withTargetPartitionsPerDayBasedCompaction(x)

(default value of x is 10)

With x = 3, FileSlices in following partitions will be selected for compaction
2020/03/13, 2020/03/12, 2020/03/11

=> from 3 latest partitions

BoundedPartitionAwareCompactionStrategyThis strategy ensures that the last N partitions are picked up even if there are later partitions created for the
table.
HoodieComapctionConfig.withTargetPartitionsPerDayBasedCompaction(x)

(default value of x is 10)

With x = 3, FileSlices in following partitions will be selected for compaction
2020/03/13, 2020/03/12, 2020/03/11, 2020/03/10, 2020/03/09

=> from all partitions till three days ago

BoundedIOCompactionStrategyThis strategy looks at total IO to be done for the compaction (read + write) and limits the list of
compactions to be under a configured limit on the IO.
HoodieComapctionConfig.withTargetIOPerCompactionInMB(x)

(default value of x is 500GB)

In a single compaction operation, the data read = size of dataFile (parquet) + sum of size of logFiles.
In a a single compaction operation, the data written = size of dataFile (parquet)  // because the logFiles have updates to records in dataFile
In a a single compaction operation, the total IO = data read + data written

With x = 1000, FileSlices will be iterated and selected until the total IO size is less than 1000MB

=> The total data read and written will be approximately limited to 1000MB

LogFileSizeBasedCompactionStrategy

(this is default strategy)

This strategy orders the compactions based on the total log files size and limits the compactions within a configured IO bound.HoodieComapctionConfig.withTargetIOPerCompactionInMB(x)

(default value of x is 500GB)

The selection critieria of FileSlices will be same as in the above BoundedIOCompactionStrategy. But the initial list of File-Slices will be reverse sorted by the total size of logFiles in that slice. 

=> Larger log file size is compacted first. Overall IO is bounded to xMB

UnBoundedCompactionStrategyThis strategy will not change ordering or filter any compaction. It is a pass-through and will compact
all the base files which has a log file. This usually means no-intelligence on compaction.
noneAll FileSlices will be selected for compaction.
UnBoundedPartitionAwareCompactionStrategy

This strategy will filter all the partitions that are eligible to be compacted by a BoundedPartitionAwareCompactionStrategy
and return the result. This is done so that a long running UnBoundedPartitionAwareCompactionStrategy does not step over partitions in a shorter running BoundedPartitionAwareCompactionStrategy. Essentially, this is an inverse of the partitions chosen in
BoundedPartitionAwareCompactionStrategy.

HoodieComapctionConfig.withTargetPartitionsPerDayBasedCompaction(x)

(default value of x is 10)

With x = 3, FileSlices in following partitions will be selected for compaction
2020/03/08, 2020/03/07

=> Chooses all FileSlices which were rejected by BoundedPartitionAwareCompactionStrategy

  • No labels