You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Proposers

Approvers

Status

Current state


Current State

UNDER DISCUSSION

(tick)

IN PROGRESS


ABANDONED


COMPLETED


INACTIVE


Discussion thread:

JIRAHUDI-897

Released: <Hudi Version>

Abstract

The business scenarios of the data lake mainly include analysis of databases, logs, and files. One of the key trade-offs in managing a data lake is to choose between write throughput and query performance. For better write throughput, it is usually better to write all new incoming data into one data file. This will improve ingestion speed substantially. But, this can create many small files. Also, in this approach, data locality is not optimal. Data is co-located with other records in the incoming batch and not with data that is queried often. Small file sizes and lack of data locality can degrade query performance. In addition, for many file systems including HDFS, performance degrades when there are many small files. 

In this proposal, we look at options for improving write throughput without compromising on query performance.

Databricks delta lake also aim at these three  scenario. [1]

Background

At present, hudi can better support the scenario where the database cdc is incrementally written to hudi, and it is also doing bulkload files to hudi. 

However, there is no good native support for log scenarios (requiring high-throughput writes, no updates, deletions, and focusing on small file scenarios);now can write through inserts without deduplication, but they will still merge on the write side.

  • In copy on write mode when "hoodie.parquet.small.file.limit" is 100MB, but  every batch small  will cost some time for merge,it will reduce write throughput.  
  • This scene is not suitable for  merge on read. 
  • the actual scenario only needs to write parquet in batches when writing, and then provide reverse compaction (similar to delta lake )

​​

Go back to the log scenarios, the user's data is continuously written, and there is no update or delete operation (such as a really interesting use scenario); Hudi is not friendly enough to support such pure write scenarios, and it does not provide pure write operations, and will The insert operation will be converted into an upsert operation. The upsert operation will first locate the file where the record is located, and if there is a small file, it will merge with the small file first, which results in low write throughput and serious write amplification problems. Consider the following application scenario: the user pulls records for each batch to generate a file with a size of 1M, hoodie.parquet.small.file.limit defaults to 100MB, then each batch write will be merged with the previous small file, which leads to In order to solve the problem of inefficient insertion throughput and write amplification.

Implementation Option#1: Using APPEND mode

​​In order to solve the above problems, Hudi needs to support the pure writing of data (that is, the positioning of skip records and the merge of small files at the time of writing), that is, to provide a new APPEND mode, and then the small Data files are merged into large files.
Now the bulkInsertAPI provided by Hudi can skip the process of locating records and merging small files and generate files directly, but its main function is to load the full amount of data into Hudi for the first time, not an append method, so the appendAPI needs to be added. Append only Write data quickly to the data file.
Assume that the user's partition directory structure after multiple batches of append is as follows

partition1
| ---- file0-000.parquet 1MB
| ---- file1-001.parquet 1MB
| ---- file2-002.parquet 1MB
partition2
| ---- file3-000.parquet 1MB
| ---- file4-001.parquet 1MB
| ---- file5-002.parquet 1MB


You can see that the file generated by each submission is very small, only close to 1MB. In total, 6 FileGroups (3 for partition1 and 2 for partition2) will be generated. appendAPI will greatly improve the throughput of data writing and reduce the problem of write amplification.

Compaction

Hudi ’s Compaction now has two mechanisms: synchronous Compaction (Inline Compaction) and asynchronous Compaction (Async Compaction), both of which are only for the Merge On Read mode, and are the Compaction in the FileGroup, which is the log file and data file of the Compaction The data is the same FileGroup. In the above scenario, the current compaction does not work, because the compaction in the above scenario is at the FileGroup level, that is, merging data files of different FileGroups. Referring to the current Compaction overall process, the Compaction at the FileGroup level is mainly divided into two steps: generating a Plan and asynchronously executing the Plan. Let's first look at the definition of CompactionPlan.

Definition of CompactionPlan

To distinguish the current HoodieCompactionPlan, it is proposed to introduce HoodieBaseFileCompactionPlan, which is defined as follows.

{
   "namespace": "org.apache.hudi.avro.model",
   "type": "record",
   "name": "HoodieBaseFileCompactionPlan",
   "fields": [
     {
         "name": "operations",
         "type": ["null", {
            "type": "array",
            "items": {
               "name": "HoodieBaseFileCompactionOperation",
               "type": "record",
               "fields": [
                  {
                     "name": "baseFilePaths",
                     "type": ["null", {
                        "type": "array",
                        "items": "string"
                     }],
                     "default": null
                  },
                  {
                     "name": "partitionPath",
                     "type": ["null", "string"],
                     "default": null
                  },
                  {
                     "name": "metrics",
                     "type": ["null", {
                        "type": "map",
                        "values": "double"
                     }],
                     "default": null
                  }
               ]
            }
        }],
       "default": null
    },
    {
       "name": "extraMetadata",
       "type": ["null", {
          "type": "map",
          "values": "string"
       }],
       "default": null
    },
    {
       "name": "version",
       "type": ["int", "null"],
       "default": 1
    }
  ]
}

Generate BaseFileCompactionPlan

• For DeltaStreamer

In Continous mode, if you start asynchronous compaction after writing data every time, then call HoodieWriteClient # scheduleCompaction to generate HoodieCompactionPlan and serialize it into a metadata file, then put scheduledCompactionInstant into the asynchronous compaction queue The thread pool will continuously take out scheduledCompactionInstant from the queue for asynchronous scheduling. Similarly, for the generation of BaseFileCompactionPlan, you can call HoodieWriteClient # scheduleBaseFileCompaction to generate BaseFileCompactionPlan, the overall logic is similar to the above.

• For Spark Streaming

Hudi does not yet support Async Compaction when writing through Spark Streaming, and can only use InlineCompaction to track through HUDI-575( Unable to render Jira issues macro, execution error. ). That is, after Spark Streaming is written, HoodieWriteClient # scheduleBaseFileCompaction is first scheduled to generate BaseFileCompactionPlan, and a thread pool is opened for asynchronous compression by referring to DeltaStreamer.
The logic of generating BaseFileCompactionPlan is roughly as follows

• Find all small files in all partitions;
• A HoodieBaseFileCompactionOperation is generated for different partitions;
• Save to BaseFileCompactionPlan.

The entire ScheduleCompaction process is roughly as follows

Execute BaseFileCompactionPlan

Compared to the current HoodieWriteClient # compaction to start HoodieCompactionPlan, the HoodieWriteClient # compactionBaseFile method can be introduced to execute BaseFileCompactionPlan. The entire execution logic is as follows

• Concurrent execution of HoodieBaseFileCompactionOperation
• Merge all the small files in HoodieBaseFileCompactionOperation (tentatively merge all files), and then further refine the merge algorithm (because the file records need to be read first and then merged and written, so the memory footprint and IO bandwidth will increase).
• Generate new files.
• Delete the previous small file.
• Commit compaction.

The entire Compaction process is roughly as follows.

Implementation Option#2: Clustering with snapshot isolation

Hoodie write client insert/upsert/bulk_insert will continue to function as before. Users can configure the small file soft limit to 0 to force new data to go into a new set of file groups. In addition, ‘clustering’ action is provided to collapse file groups based on the desired size limit. This clustering can run asynchronously or synchronously and will provide snapshot isolation between readers and writers. The exact steps taken for clustering are listed below for each table type.

COW Table timeline

In the example flow chart above, we show a partition state over time (t5 to t9).  The sequence of steps taken for writing are listed below.

  1. At t5, a partition in table has 5 file groups f0, f1, f2, f3, f4. For simplicity, assume that each file group is 100MB. So the total data in the partition is 500MB.
  2. A clustering operation is requested at t6. Similar to compaction, we create a “t6.clustering.requested” file in metadata with ‘ClusteringPlan’ that includes all the file groups touched by clustering action across all partitions. 
    1. Example contents:
    2. { partitionPath: {“datestr”}, oldfileGroups: [ {fileId: “f0”, time: “t0”}, { fileId: “f1”, time: “t1”}, ... ], newFileGroups: [“c1”, “c2”]  }
  3. Lets say maximum file size after clustering is configured to be 250MB. Clustering would re-distribute all the data in partition into two file groups: c1, c2. These file groups are ‘phantom’ and invisible to queries until clustering is complete at t8.
  4. Also, note that records in a file group can be split into multiple file groups. In this example, some records from the f4 file group go to both new file groups c1, c2.
  5. While the clustering is in progress (t6 through t8), any upserts that touch these file groups are rejected
  6. After writing new data files c1-t6.parquet and c2-t6.parquet, if a global index is configured, we add entries in the record level index for all the keys with the new location. The new index entries will not be visible to other writes because there is no commit associated yet.
  7. Finally, we create a commit metadata file ‘t6.commit’ that includes file groups modified by this commit (f0,f1,f2,f3,f4).
  8.  Note that file groups (f0 to f4) are not deleted from disk immediately. Cleaner would clean these files before archiving t6.commit. We also update all views to ignore all file groups mentioned in all the commit metadata files. So readers will not see duplicates.


Note that there is a possible race condition at step 5 if multiple writers are allowed. Another writer could have started upserts just before the ‘clustering.requested’ file is written. In the initial version, for simplicity, we assume there is only a single writer. The writer can either schedule clustering or run ingestion. The actual clustering operation can run asynchronously. When hoodie has multi-writer support, we can consider making scheduling asynchronous too. 


MOR Table timeline

This is very similar to the COW table. One significant point to highlight is that inserts can go into parquet files or into log files. Clustering would only create parquet filesCompaction and clustering cannot run at the same time on the same file groups. Compaction also needs changes to ignore file groups that are already clustered.


Rollout/Adoption Plan

  • No impact on the existing users because add new function

Test Plan

  • Unit tests
  • Integration tests
  • Test on the cluster for a larger dataset. 


  • No labels