Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Databricks delta lake also aim at these three  scenario. [1]

Background

At present, hudi can better support the scenario where the database cdc is incrementally written to hudi, and it is also doing bulkload files to hudi. 

However, there is no good native support for log scenarios (requiring high-throughput writes, no updates, deletions, and focusing on small file scenarios);now can write through inserts without deduplication, but they will still merge on the write side.

  • In copy on write mode when "hoodie.parquet.small.file.limit" is 100MB, but  every batch small  will cost some time for merge,it will reduce write throughput.  
  • This scene is not suitable for  merge on read. 
  • the actual scenario only needs to write parquet in batches when writing, and then provide reverse compaction (similar to delta lake )

​​

Implementation

1. On the write side, just write every batch to parquet file base on the snapshot mechanism,default open the merge,use can close the auto merge for more  write throughput.  

2. hudi support asynchronous merge small parquet files like databricks delta lake's  OPTIMIZE command [2] 


[1] https://databricks.com/product/delta-lake-on-databricks

[2] https://docs.databricks.com/delta/optimizations/file-mgmt.html


Rollout/Adoption Plan

  • No impact on the existing users because add new function

...

  • Unit tests
  • Integration tests
  • Test on the cluster for a larger dataset. 


Proposers

Approvers

Status

Current state


Current State

Status
titleUnder Discussion

(tick)

Status
colourYellow
titleIn Progress


Status
colourRed
titleABANDONED


Status
colourGreen
titleCompleted


Status
colourBlue
titleINactive


Discussion thread:

JIRAHUDI-897

Released: <Hudi Version>

Abstract

The business scenarios of the data lake mainly include analysis of databases, logs, and files.

Image Added

Databricks delta lake also aim at these three  scenario. [1]

Background

At present, hudi can better support the scenario where the database cdc is incrementally written to hudi, and it is also doing bulkload files to hudi. 

However, there is no good native support for log scenarios (requiring high-throughput writes, no updates, deletions, and focusing on small file scenarios);now can write through inserts without deduplication, but they will still merge on the write side.

  • In copy on write mode when "hoodie.parquet.small.file.limit" is 100MB, but  every batch small  will cost some time for merge,it will reduce write throughput.  
  • This scene is not suitable for  merge on read. 
  • the actual scenario only needs to write parquet in batches when writing, and then provide reverse compaction (similar to delta lake )

Go back to the log scenarios, the user's data is continuously written, and there is no update or delete operation (such as a really interesting use scenario); Hudi is not friendly enough to support such pure write scenarios, and it does not provide pure write operations, and will The insert operation will be converted into an upsert operation. The upsert operation will first locate the file where the record is located, and if there is a small file, it will merge with the small file first, which results in low write throughput and serious write amplification problems. Consider the following application scenario: the user pulls records for each batch to generate a file with a size of 1M, hoodie.parquet.small.file.limit defaults to 100MB, then each batch write will be merged with the previous small file, which leads to In order to solve the problem of inefficient insertion throughput and write amplification.


Design

​​In order to solve the above problems, Hudi needs to support the pure writing of data (that is, the positioning of skip records and the merge of small files at the time of writing), that is, to provide a new APPEND mode, and then the small Data files are merged into large files.
Now the bulkInsertAPI provided by Hudi can skip the process of locating records and merging small files and generate files directly, but its main function is to load the full amount of data into Hudi for the first time, not an append method, so the appendAPI needs to be added. Append only Write data quickly to the data file.
Assume that the user's partition directory structure after multiple batches of append is as follows

partition1
| ---- file0-000.parquet 1MB
| ---- file1-001.parquet 1MB
| ---- file2-002.parquet 1MB
partition2
| ---- file3-000.parquet 1MB
| ---- file4-001.parquet 1MB
| ---- file5-002.parquet 1MB


You can see that the file generated by each submission is very small, only close to 1MB. In total, 6 FileGroups (3 for partition1 and 2 for partition2) will be generated. appendAPI will greatly improve the throughput of data writing and reduce the problem of write amplification.

Compaction

Hudi ’s Compaction now has two mechanisms: synchronous Compaction (Inline Compaction) and asynchronous Compaction (Async Compaction), both of which are only for the Merge On Read mode, and are the Compaction in the FileGroup, which is the log file and data file of the Compaction The data is the same FileGroup. In the above scenario, the current compaction does not work, because the compaction in the above scenario is at the FileGroup level, that is, merging data files of different FileGroups. Referring to the current Compaction overall process, the Compaction at the FileGroup level is mainly divided into two steps: generating a Plan and asynchronously executing the Plan. Let's first look at the definition of CompactionPlan.

Definition of CompactionPlan

To distinguish the current HoodieCompactionPlan, it is proposed to introduce HoodieBaseFileCompactionPlan, which is defined as follows.

{
   "namespace": "org.apache.hudi.avro.model",
   "type": "record",
   "name": "HoodieBaseFileCompactionPlan",
   "fields": [
     {
         "name": "operations",
         "type": ["null", {
            "type": "array",
            "items": {
               "name": "HoodieBaseFileCompactionOperation",
               "type": "record",
               "fields": [
                  {
                     "name": "baseFilePaths",
                     "type": ["null", {
                        "type": "array",
                        "items": "string"
                     }],
                     "default": null
                  },
                  {
                     "name": "partitionPath",
                     "type": ["null", "string"],
                     "default": null
                  },
                  {
                     "name": "metrics",
                     "type": ["null", {
                        "type": "map",
                        "values": "double"
                     }],
                     "default": null
                  }
               ]
            }
        }],
       "default": null
    },
    {
       "name": "extraMetadata",
       "type": ["null", {
          "type": "map",
          "values": "string"
       }],
       "default": null
    },
    {
       "name": "version",
       "type": ["int", "null"],
       "default": 1
    }
  ]
}

Generate BaseFileCompactionPlan

• For DeltaStreamer

In Continous mode, if you start asynchronous compaction after writing data every time, then call HoodieWriteClient # scheduleCompaction to generate HoodieCompactionPlan and serialize it into a metadata file, then put scheduledCompactionInstant into the asynchronous compaction queue The thread pool will continuously take out scheduledCompactionInstant from the queue for asynchronous scheduling. Similarly, for the generation of BaseFileCompactionPlan, you can call HoodieWriteClient # scheduleBaseFileCompaction to generate BaseFileCompactionPlan, the overall logic is similar to the above.

• For Spark Streaming

Hudi does not yet support Async Compaction when writing through Spark Streaming, and can only use InlineCompaction to track through HUDI-575(

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyHUDI-575
). That is, after Spark Streaming is written, HoodieWriteClient # scheduleBaseFileCompaction is first scheduled to generate BaseFileCompactionPlan, and a thread pool is opened for asynchronous compression by referring to DeltaStreamer.
The logic of generating BaseFileCompactionPlan is roughly as follows

• Find all small files in all partitions;
• A HoodieBaseFileCompactionOperation is generated for different partitions;
• Save to BaseFileCompactionPlan.

The entire ScheduleCompaction process is roughly as follows

Image Added

Execute BaseFileCompactionPlan

Compared to the current HoodieWriteClient # compaction to start HoodieCompactionPlan, the HoodieWriteClient # compactionBaseFile method can be introduced to execute BaseFileCompactionPlan. The entire execution logic is as follows

• Concurrent execution of HoodieBaseFileCompactionOperation
• Merge all the small files in HoodieBaseFileCompactionOperation (tentatively merge all files), and then further refine the merge algorithm (because the file records need to be read first and then merged and written, so the memory footprint and IO bandwidth will increase).
• Generate new files.
• Delete the previous small file.
• Commit compaction.

The entire Compaction process is roughly as follows.

Image Added

Implementation

1. On the write side, just write every batch to parquet file base on the snapshot mechanism,default open the merge,use can close the auto merge for more  write throughput.  

2. hudi support asynchronous merge small parquet files like databricks delta lake's  OPTIMIZE command [2] 


[1] https://databricks.com/product/delta-lake-on-databricks

[2] https://docs.databricks.com/delta/optimizations/file-mgmt.html


Rollout/Adoption Plan

  • No impact on the existing users because add new function

Test Plan

  • Unit tests
  • Integration tests
  • Test on the cluster for a larger dataset.