You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 13 Next »

Proposers

Approvers

Status

Current state


Current State

UNDER DISCUSSION


IN PROGRESS

(tick)

ABANDONED


COMPLETED


INACTIVE


Discussion thread:

JIRAHUDI-897  Unable to render Jira issues macro, execution error.

Released: <Hudi Version>

Abstract

The business scenarios of the data lake mainly include analysis of databases, logs, and files. One of the key trade-offs in managing a data lake is to choose between write throughput and query performance. For better write throughput, it is usually better to write all new incoming data into one data file. This will improve ingestion speed substantially. But, this can create many small files. Also, in this approach, data locality is not optimal. Data is co-located with other records in the incoming batch and not with data that is queried often. Small file sizes and lack of data locality can degrade query performance. In addition, for many file systems including HDFS, performance degrades when there are many small files. 

In this proposal, we look at options for improving write throughput without compromising on query performance.

Databricks delta lake also aim at these three  scenario. [1]

Background

At present, hudi can better support the scenario where the database cdc is incrementally written to hudi, and it is also doing bulkload files to hudi. 

However, there is no good native support for log scenarios (requiring high-throughput writes, no updates, deletions, and focusing on small file scenarios);now can write through inserts without deduplication, but they will still merge on the write side.

  • In copy on write mode when "hoodie.parquet.small.file.limit" is 100MB, but  every batch small  will cost some time for merge,it will reduce write throughput.  
  • This scene is not suitable for  merge on read. 
  • the actual scenario only needs to write parquet in batches when writing, and then provide reverse compaction (similar to delta lake )

​​

Go back to the log scenarios, the user's data is continuously written, and there is no update or delete operation (such as a really interesting use scenario); Hudi is not friendly enough to support such pure write scenarios, and it does not provide pure write operations, and will The insert operation will be converted into an upsert operation. The upsert operation will first locate the file where the record is located, and if there is a small file, it will merge with the small file first, which results in low write throughput and serious write amplification problems. Consider the following application scenario: the user pulls records for each batch to generate a file with a size of 1M, hoodie.parquet.small.file.limit defaults to 100MB, then each batch write will be merged with the previous small file, which leads to In order to solve the problem of inefficient insertion throughput and write amplification.

Implementation Option#1: Using APPEND mode

​​In order to solve the above problems, Hudi needs to support the pure writing of data (that is, the positioning of skip records and the merge of small files at the time of writing), that is, to provide a new APPEND mode, and then the small Data files are merged into large files.
Now the bulkInsertAPI provided by Hudi can skip the process of locating records and merging small files and generate files directly, but its main function is to load the full amount of data into Hudi for the first time, not an append method, so the appendAPI needs to be added. Append only Write data quickly to the data file.
Assume that the user's partition directory structure after multiple batches of append is as follows

partition1
| ---- file0-000.parquet 1MB
| ---- file1-001.parquet 1MB
| ---- file2-002.parquet 1MB
partition2
| ---- file3-000.parquet 1MB
| ---- file4-001.parquet 1MB
| ---- file5-002.parquet 1MB


You can see that the file generated by each submission is very small, only close to 1MB. In total, 6 FileGroups (3 for partition1 and 2 for partition2) will be generated. appendAPI will greatly improve the throughput of data writing and reduce the problem of write amplification.

Compaction

Hudi ’s Compaction now has two mechanisms: synchronous Compaction (Inline Compaction) and asynchronous Compaction (Async Compaction), both of which are only for the Merge On Read mode, and are the Compaction in the FileGroup, which is the log file and data file of the Compaction The data is the same FileGroup. In the above scenario, the current compaction does not work, because the compaction in the above scenario is at the FileGroup level, that is, merging data files of different FileGroups. Referring to the current Compaction overall process, the Compaction at the FileGroup level is mainly divided into two steps: generating a Plan and asynchronously executing the Plan. Let's first look at the definition of CompactionPlan.

Definition of CompactionPlan

To distinguish the current HoodieCompactionPlan, it is proposed to introduce HoodieBaseFileCompactionPlan, which is defined as follows.

{
   "namespace": "org.apache.hudi.avro.model",
   "type": "record",
   "name": "HoodieBaseFileCompactionPlan",
   "fields": [
     {
         "name": "operations",
         "type": ["null", {
            "type": "array",
            "items": {
               "name": "HoodieBaseFileCompactionOperation",
               "type": "record",
               "fields": [
                  {
                     "name": "baseFilePaths",
                     "type": ["null", {
                        "type": "array",
                        "items": "string"
                     }],
                     "default": null
                  },
                  {
                     "name": "partitionPath",
                     "type": ["null", "string"],
                     "default": null
                  },
                  {
                     "name": "metrics",
                     "type": ["null", {
                        "type": "map",
                        "values": "double"
                     }],
                     "default": null
                  }
               ]
            }
        }],
       "default": null
    },
    {
       "name": "extraMetadata",
       "type": ["null", {
          "type": "map",
          "values": "string"
       }],
       "default": null
    },
    {
       "name": "version",
       "type": ["int", "null"],
       "default": 1
    }
  ]
}

Generate BaseFileCompactionPlan

• For DeltaStreamer

In Continous mode, if you start asynchronous compaction after writing data every time, then call HoodieWriteClient # scheduleCompaction to generate HoodieCompactionPlan and serialize it into a metadata file, then put scheduledCompactionInstant into the asynchronous compaction queue The thread pool will continuously take out scheduledCompactionInstant from the queue for asynchronous scheduling. Similarly, for the generation of BaseFileCompactionPlan, you can call HoodieWriteClient # scheduleBaseFileCompaction to generate BaseFileCompactionPlan, the overall logic is similar to the above.

• For Spark Streaming

Hudi does not yet support Async Compaction when writing through Spark Streaming, and can only use InlineCompaction to track through HUDI-575( Unable to render Jira issues macro, execution error. ). That is, after Spark Streaming is written, HoodieWriteClient # scheduleBaseFileCompaction is first scheduled to generate BaseFileCompactionPlan, and a thread pool is opened for asynchronous compression by referring to DeltaStreamer.
The logic of generating BaseFileCompactionPlan is roughly as follows

• Find all small files in all partitions;
• A HoodieBaseFileCompactionOperation is generated for different partitions;
• Save to BaseFileCompactionPlan.

The entire ScheduleCompaction process is roughly as follows

Execute BaseFileCompactionPlan

Compared to the current HoodieWriteClient # compaction to start HoodieCompactionPlan, the HoodieWriteClient # compactionBaseFile method can be introduced to execute BaseFileCompactionPlan. The entire execution logic is as follows

• Concurrent execution of HoodieBaseFileCompactionOperation
• Merge all the small files in HoodieBaseFileCompactionOperation (tentatively merge all files), and then further refine the merge algorithm (because the file records need to be read first and then merged and written, so the memory footprint and IO bandwidth will increase).
• Generate new files.
• Delete the previous small file.
• Commit compaction.

The entire Compaction process is roughly as follows.

Implementation Option#2: Clustering with snapshot isolation

Hoodie write client insert/upsert/bulk_insert will continue to function as before. Users can configure the small file soft limit to 0 to force new data to go into a new set of file groups. In addition, ‘clustering’ action is provided to collapse file groups based on the desired size limit. This clustering can run asynchronously or synchronously and will provide snapshot isolation between readers and writers. The exact steps taken for clustering are listed below for each table type.

COW Table timeline

In the example flow chart above, we show a partition state over time (t5 to t9).  The sequence of steps taken for writing are listed below.

  1. At t5, a partition in table has 5 file groups f0, f1, f2, f3, f4. For simplicity, assume that each file group is 100MB. So the total data in the partition is 500MB.
  2. A clustering operation is requested at t6. Similar to compaction, we create a “t6.clustering.requested” file in metadata with ‘ClusteringPlan’ that includes all the file groups touched by clustering action across all partitions. 
    1. Example contents:
    2. { partitionPath: {“datestr”}, oldfileGroups: [ {fileId: “f0”, time: “t0”}, { fileId: “f1”, time: “t1”}, ... ], newFileGroups: [“c1”, “c2”]  }
  3. Lets say maximum file size after clustering is configured to be 250MB. Clustering would re-distribute all the data in partition into two file groups: c1, c2. These file groups are ‘phantom’ and invisible to queries until clustering is complete at t8.
  4. Also, note that records in a file group can be split into multiple file groups. In this example, some records from the f4 file group go to both new file groups c1, c2.
  5. While the clustering is in progress (t6 through t8), any upserts that touch these file groups are rejected
  6. After writing new data files c1-t6.parquet and c2-t6.parquet, if a global index is configured, we add entries in the record level index for all the keys with the new location. The new index entries will not be visible to other writes because there is no commit associated yet.
  7. Finally, we create a commit metadata file ‘t6.commit’ that includes file groups modified by this commit (f0,f1,f2,f3,f4).
  8.  Note that file groups (f0 to f4) are not deleted from disk immediately. Cleaner would clean these files before archiving t6.commit. We also update all views to ignore all file groups mentioned in all the commit metadata files. So readers will not see duplicates.


Note that there is a possible race condition at step 5 if multiple writers are allowed. Another writer could have started upserts just before the ‘clustering.requested’ file is written. In the initial version, for simplicity, we assume there is only a single writer. The writer can either schedule clustering or run ingestion. The actual clustering operation can run asynchronously. When hoodie has multi-writer support, we can consider making scheduling asynchronous too. 


MOR Table timeline

This is very similar to the COW table. For MOR table, inserts can go into either parquet files or into log files. This approach will continue to support both modes. The output of clustering is always parquet format.  Also, compaction and clustering cannot run at the same time on the same file groups. Compaction also needs changes to ignore file groups that are already clustered.


Performance numbers

Time for reading metadata

Test is done to measure time take to write & read 'replace' metadata using code here. Here is the result:


Partitions

Total FileGroups replaced

(divide by column1 to get number of file groups per partition)

Serialization cost (millis)

Deserialization cost (millis)

Memory utilization

(HoodieReplaceMetadata object size + serialized byte[] size in memory )

1

300

55

41

60KB

1

3,000

55

42

570KB

1

30,000

93

120

5.7MB

1

300,000

103

130

57MB

10

300

53

32

60KB

10

3,000 

68

52

574KB

10

30,000 

87

104

5.7MB

10

300,000

97

114

57MB

We plan to store this metadata similar to clean metadata in avro files. After consolidated metadata is launched, we can come up with a plan to migrate this to leverage consolidated metadata(This will likely reduce memory required for cases where a partition has large number of files replaced)


Clustering steps


Overall, there are 2 parts to clustering

  1. Scheduling clustering: Create clustering plan
  2. Execute clustering: Process the plan. Create new files and replace old files.

Scheduling clustering

Following steps are followed to schedule clustering

  1. Identify files that are eligible for clustering
    1. Filter specific partitions (based on config to prioritize latest vs older partitions)
    2. Any files that have size > targetFileSize are not eligible for clustering
    3. Any files that have pending compaction/clustering scheduled are not eligible for clustering
    4. Any filegroups that have log files are not eligible for clustering  (We could remove this restriction at a later stage.)
  2. Group files that are eligible for clustering based on specific criteria. Each group is expected to have data size in multiples of ‘targetFileSize’.  Grouping is done as part of ‘strategy’.
    1. If sort columns are specified,
      1. Among the files that are eligible for clustering, it is better to group files that have overlapping data for the columns specified together.
      2. Example: say the target of clustering is to produce 1GB files. Partition initially has 8 * 512MB files. (After clustering, we expect data to be present in 4 * 1GB files.)
        1. Assume among 8 files, say only 2 files have overlapping data for the ‘sort column’, then these 2 files will be part of one group. Output of the group after clustering is one 1GB file. 
        2. Assume among 8 files, say 4 files have overlapping data for the ‘sort column’, then these 4 files will be part of one group. Output of the group after clustering is two 1GB files.
      3. We could put a cap on group size to improve parallelism and avoid shuffling large amounts of data 
    2. If sort columns are not specified, we could consider grouping files based on other criteria: (All of these can be exposed as different strategies).
      1. Group files based on record key ranges. This is useful because key range is stored in a parquet footer and can be used for certain queries/updates.
      2. Groups files based on commit time. 
      3. Random grouping of files.
  3. Filter groups based on specific criteria (akin to orderAndFilter in CompactionStrategy)
  4. Finally, the clustering plan is saved to the timeline. Structure of metadata


{

  "namespace":"org.apache.hudi.avro.model",

  "type":"record",

  "name":"HoodieClusteringPlan",

  "fields":[

    {

        "name":"clusteringGroups",

        "type":["null", {

           "type":"array",

           "items":{

              "name":"HoodieClusteringGroup",

              "type":"record",

              "fields":[

                 {

                    "name":"fileIds",

                    "type":["null", {

                       "type":"array",

                       "items":"string"

                    }],

                    "default": null

                 },

                 {

                    "name":"partitionPath",

                    "type":["null","string"],

                    "default": null

                 },

                 {

                    "name":"metrics",

                    "type":["null", {

                       "type":"map",

                       "values":"double"

                    }],

                    "default": null

                 }

              ]

           }

       }],

      "default": null

   },

   {

      "name":"targetFileSize",

      "type":["long", "null"],

      "default": 1073741824

   },

   {

      "name":"sortColumns",

      "type":["null", {

         "type":"array",

         "items":"string"

      }],

      "default": null

   },

   {

      "name":"extraMetadata",

      "type":["null", {

         "type":"map",

         "values":"string"

      }],

      "default": null

   },

   {

      "name":"version",

      "type":["int", "null"],

      "default": 1

   }

 ]

}


In the ‘metrics’ element, we could store ‘min’ and ‘max’ for each column in the file for helping with debugging and operations.

Running clustering

  1. Create inflight clustering file
  2. Read the clustering plan, look at the number of ‘clusteringGroups’. This gives parallelism.
  3. For each group
    1.  If sort order is not specified, we could just combine the records and write to new buckets using existing logic similar to bulk_insert/insert.
    2. If sort order is specified, we need to add new logic (essentially do merge sort across files within group and write records to target file groups honoring targetFileSize ) and write the new file groups
  4. Create replacecommit. Contents are in HoodieReplaceCommitMetadata 
    1. operationType is set to ‘clustering’.
    2.  We can extend the metadata and add additional metrics including range of values for each column in each file etc.
    3. TODO: see if any additional metadata is needed?

Other concerns:

  • Can we do group files while Running clustering (as opposed to grouping during scheduling)?
    • To limit IO, scheduling filters certain file groups from clustering. If these file groups filtered have overlapping data with files selected, effectiveness of clustering will be limited. So I think grouping and filtering during scheduling has some benefits. 
  • Is the ClusteringPlan extensible enough for future use cases?
    • With the above approach, executing a clustering plan is basically dependent on two parameters: ‘targetFileSize’ and ‘sortColumns’. Based on these parameters, we create different partitioners/write data differently in new locations. Because this avro schema is extensible, we could add new fields and support any other usecases that might come up.



Rollout/Adoption Plan

  • No impact on the existing users because add new function

Test Plan

  • Unit tests
  • Integration tests
  • Test on the cluster for a larger dataset. 


  • No labels