Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Proposers

Approvers

Status

Current state


Current State

Status
titleUnder Discussion

(tick)

Status
colourYellow
titleIn Progress


Status
colourRed
titleABANDONED


Status
colourGreen
titleCompleted


Status
colourBlue
titleINactive


...

JIRAHUDI-897 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyHUDI-957

Released: <Hudi Version>

Abstract

The business scenarios of the data lake mainly include analysis of databases, logs, and files. One of the key trade-offs in managing a data lake is to choose between write throughput and query performance. For better write throughput, it is usually better to write all new incoming data into one data file. This will improve ingestion speed substantially. But, this can create many small files. Also, in this approach, data locality is not optimal. Data is co-located with other records in the incoming batch and not with data that is queried often. Small file sizes and lack of data locality can degrade query performance. In addition, for many file systems including HDFS, performance degrades when there are many small files. 

...

Databricks delta lake also aim at these three  scenario. [1]

Background

At present, hudi can better support the scenario where the database cdc is incrementally written to hudi, and it is also doing bulkload files to hudi. 

...

Go back to the log scenarios, the user's data is continuously written, and there is no update or delete operation (such as a really interesting use scenario); Hudi is not friendly enough to support such pure write scenarios, and it does not provide pure write operations, and will The insert operation will be converted into an upsert operation. The upsert operation will first locate the file where the record is located, and if there is a small file, it will merge with the small file first, which results in low write throughput and serious write amplification problems. Consider the following application scenario: the user pulls records for each batch to generate a file with a size of 1M, hoodie.parquet.small.file.limit defaults to 100MB, then each batch write will be merged with the previous small file, which leads to In order to solve the problem of inefficient insertion throughput and write amplification.

Implementation Option#1: Using APPEND mode

​​In order to solve the above problems, Hudi needs to support the pure writing of data (that is, the positioning of skip records and the merge of small files at the time of writing), that is, to provide a new APPEND mode, and then the small Data files are merged into large files.
Now the bulkInsertAPI provided by Hudi can skip the process of locating records and merging small files and generate files directly, but its main function is to load the full amount of data into Hudi for the first time, not an append method, so the appendAPI needs to be added. Append only Write data quickly to the data file.
Assume that the user's partition directory structure after multiple batches of append is as follows

...

You can see that the file generated by each submission is very small, only close to 1MB. In total, 6 FileGroups (3 for partition1 and 2 for partition2) will be generated. appendAPI will greatly improve the throughput of data writing and reduce the problem of write amplification.

Compaction

Hudi ’s Compaction now has two mechanisms: synchronous Compaction (Inline Compaction) and asynchronous Compaction (Async Compaction), both of which are only for the Merge On Read mode, and are the Compaction in the FileGroup, which is the log file and data file of the Compaction The data is the same FileGroup. In the above scenario, the current compaction does not work, because the compaction in the above scenario is at the FileGroup level, that is, merging data files of different FileGroups. Referring to the current Compaction overall process, the Compaction at the FileGroup level is mainly divided into two steps: generating a Plan and asynchronously executing the Plan. Let's first look at the definition of CompactionPlan.

Definition of CompactionPlan

To distinguish the current HoodieCompactionPlan, it is proposed to introduce HoodieBaseFileCompactionPlan, which is defined as follows.

{
   "namespace": "org.apache.hudi.avro.model",
   "type": "record",
   "name": "HoodieBaseFileCompactionPlan",
   "fields": [
     {
         "name": "operations",
         "type": ["null", {
            "type": "array",
            "items": {
               "name": "HoodieBaseFileCompactionOperation",
               "type": "record",
               "fields": [
                  {
                     "name": "baseFilePaths",
                     "type": ["null", {
                        "type": "array",
                        "items": "string"
                     }],
                     "default": null
                  },
                  {
                     "name": "partitionPath",
                     "type": ["null", "string"],
                     "default": null
                  },
                  {
                     "name": "metrics",
                     "type": ["null", {
                        "type": "map",
                        "values": "double"
                     }],
                     "default": null
                  }
               ]
            }
        }],
       "default": null
    },
    {
       "name": "extraMetadata",
       "type": ["null", {
          "type": "map",
          "values": "string"
       }],
       "default": null
    },
    {
       "name": "version",
       "type": ["int", "null"],
       "default": 1
    }
  ]
}

Generate BaseFileCompactionPlan

• For DeltaStreamer

In Continous mode, if you start asynchronous compaction after writing data every time, then call HoodieWriteClient # scheduleCompaction to generate HoodieCompactionPlan and serialize it into a metadata file, then put scheduledCompactionInstant into the asynchronous compaction queue The thread pool will continuously take out scheduledCompactionInstant from the queue for asynchronous scheduling. Similarly, for the generation of BaseFileCompactionPlan, you can call HoodieWriteClient # scheduleBaseFileCompaction to generate BaseFileCompactionPlan, the overall logic is similar to the above.

...

The entire ScheduleCompaction process is roughly as follows

Execute BaseFileCompactionPlan

Compared to the current HoodieWriteClient # compaction to start HoodieCompactionPlan, the HoodieWriteClient # compactionBaseFile method can be introduced to execute BaseFileCompactionPlan. The entire execution logic is as follows

...

The entire Compaction process is roughly as follows.

Implementation Option#2: Clustering with snapshot isolation

Hoodie write client insert/upsert/bulk_insert will continue to function as before. Users can configure the small file soft limit to 0 to force new data to go into a new set of file groups. In addition, ‘clustering’ action is provided to collapse file groups based on the desired size limit. This clustering can run asynchronously or synchronously and will provide snapshot isolation between readers and writers. The exact steps taken for clustering are listed below for each table type.

COW Table timeline

In the example flow chart above, we show a partition state over time (t5 to t9).  The sequence of steps taken for writing are listed below.

...

Note that there is a possible race condition at step 5 if multiple writers are allowed. Another writer could have started upserts just before the ‘clustering.requested’ file is written. In the initial version, for simplicity, we assume there is only a single writer. The writer can either schedule clustering or run ingestion. The actual clustering operation can run asynchronously. When hoodie has multi-writer support, we can consider making scheduling asynchronous too. 


MOR Table timeline

This is very similar to the COW table. One significant point to highlight is that inserts can go into parquet files or into log files. Clustering would only create parquet files.  Compaction and clustering cannot run at the same time on the same file groups. Compaction also needs changes to ignore file groups that are already clustered.


Rollout/Adoption Plan

  • No impact on the existing users because add new function

Test Plan

  • Unit tests
  • Integration tests
  • Test on the cluster for a larger dataset. 

...