You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 17 Next »

Proposers

Approvers

Status

Current state


Current State

UNDER DISCUSSION


IN PROGRESS

(tick)

ABANDONED


COMPLETED


INACTIVE


Discussion thread:

JIRAHUDI-897  Unable to render Jira issues macro, execution error.

Released: <Hudi Version>


Abstract

The business scenarios of the data lake mainly include analysis of databases, logs, and files. One of the key trade-offs in managing a data lake is to choose between write throughput and query performance. For better write throughput, it is usually better to write all new incoming data into one data file. This will improve ingestion speed substantially. But, this can create many small files. Also, in this approach, data locality is not optimal. Data is co-located with other records in the incoming batch and not with data that is queried often. Small file sizes and lack of data locality can degrade query performance. In addition, for many file systems including HDFS, performance degrades when there are many small files. 

In this proposal, we look at options for improving write throughput without compromising on query performance.

Databricks delta lake also aim at these three  scenario. [1]

Background

At present, hudi can better support the scenario where the database cdc is incrementally written to hudi, and it is also doing bulkload files to hudi. 

However, there is no good native support for log scenarios (requiring high-throughput writes, no updates, deletions, and focusing on small file scenarios);now can write through inserts without deduplication, but they will still merge on the write side.

  • In copy on write mode when "hoodie.parquet.small.file.limit" is 100MB, but  every batch small  will cost some time for merge,it will reduce write throughput.  
  • This scene is not suitable for  merge on read. 
  • the actual scenario only needs to write parquet in batches when writing, and then provide reverse compaction (similar to delta lake )

​​

Go back to the log scenarios, the user's data is continuously written, and there is no update or delete operation (such as a really interesting use scenario); Hudi is not friendly enough to support such pure write scenarios, and it does not provide pure write operations, and will The insert operation will be converted into an upsert operation. The upsert operation will first locate the file where the record is located, and if there is a small file, it will merge with the small file first, which results in low write throughput and serious write amplification problems. Consider the following application scenario: the user pulls records for each batch to generate a file with a size of 1M, hoodie.parquet.small.file.limit defaults to 100MB, then each batch write will be merged with the previous small file, which leads to In order to solve the problem of inefficient insertion throughput and write amplification.

Implementation Option#1: Using APPEND mode

​​In order to solve the above problems, Hudi needs to support the pure writing of data (that is, the positioning of skip records and the merge of small files at the time of writing), that is, to provide a new APPEND mode, and then the small Data files are merged into large files.
Now the bulkInsertAPI provided by Hudi can skip the process of locating records and merging small files and generate files directly, but its main function is to load the full amount of data into Hudi for the first time, not an append method, so the appendAPI needs to be added. Append only Write data quickly to the data file.
Assume that the user's partition directory structure after multiple batches of append is as follows

partition1
| ---- file0-000.parquet 1MB
| ---- file1-001.parquet 1MB
| ---- file2-002.parquet 1MB
partition2
| ---- file3-000.parquet 1MB
| ---- file4-001.parquet 1MB
| ---- file5-002.parquet 1MB


You can see that the file generated by each submission is very small, only close to 1MB. In total, 6 FileGroups (3 for partition1 and 2 for partition2) will be generated. appendAPI will greatly improve the throughput of data writing and reduce the problem of write amplification.

Compaction

Hudi ’s Compaction now has two mechanisms: synchronous Compaction (Inline Compaction) and asynchronous Compaction (Async Compaction), both of which are only for the Merge On Read mode, and are the Compaction in the FileGroup, which is the log file and data file of the Compaction The data is the same FileGroup. In the above scenario, the current compaction does not work, because the compaction in the above scenario is at the FileGroup level, that is, merging data files of different FileGroups. Referring to the current Compaction overall process, the Compaction at the FileGroup level is mainly divided into two steps: generating a Plan and asynchronously executing the Plan. Let's first look at the definition of CompactionPlan.

Definition of CompactionPlan

To distinguish the current HoodieCompactionPlan, it is proposed to introduce HoodieBaseFileCompactionPlan, which is defined as follows.

{
   "namespace": "org.apache.hudi.avro.model",
   "type": "record",
   "name": "HoodieBaseFileCompactionPlan",
   "fields": [
     {
         "name": "operations",
         "type": ["null", {
            "type": "array",
            "items": {
               "name": "HoodieBaseFileCompactionOperation",
               "type": "record",
               "fields": [
                  {
                     "name": "baseFilePaths",
                     "type": ["null", {
                        "type": "array",
                        "items": "string"
                     }],
                     "default": null
                  },
                  {
                     "name": "partitionPath",
                     "type": ["null", "string"],
                     "default": null
                  },
                  {
                     "name": "metrics",
                     "type": ["null", {
                        "type": "map",
                        "values": "double"
                     }],
                     "default": null
                  }
               ]
            }
        }],
       "default": null
    },
    {
       "name": "extraMetadata",
       "type": ["null", {
          "type": "map",
          "values": "string"
       }],
       "default": null
    },
    {
       "name": "version",
       "type": ["int", "null"],
       "default": 1
    }
  ]
}

Generate BaseFileCompactionPlan

• For DeltaStreamer

In Continous mode, if you start asynchronous compaction after writing data every time, then call HoodieWriteClient # scheduleCompaction to generate HoodieCompactionPlan and serialize it into a metadata file, then put scheduledCompactionInstant into the asynchronous compaction queue The thread pool will continuously take out scheduledCompactionInstant from the queue for asynchronous scheduling. Similarly, for the generation of BaseFileCompactionPlan, you can call HoodieWriteClient # scheduleBaseFileCompaction to generate BaseFileCompactionPlan, the overall logic is similar to the above.

• For Spark Streaming

Hudi does not yet support Async Compaction when writing through Spark Streaming, and can only use InlineCompaction to track through HUDI-575( Unable to render Jira issues macro, execution error. ). That is, after Spark Streaming is written, HoodieWriteClient # scheduleBaseFileCompaction is first scheduled to generate BaseFileCompactionPlan, and a thread pool is opened for asynchronous compression by referring to DeltaStreamer.
The logic of generating BaseFileCompactionPlan is roughly as follows

• Find all small files in all partitions;
• A HoodieBaseFileCompactionOperation is generated for different partitions;
• Save to BaseFileCompactionPlan.

The entire ScheduleCompaction process is roughly as follows

Execute BaseFileCompactionPlan

Compared to the current HoodieWriteClient # compaction to start HoodieCompactionPlan, the HoodieWriteClient # compactionBaseFile method can be introduced to execute BaseFileCompactionPlan. The entire execution logic is as follows

• Concurrent execution of HoodieBaseFileCompactionOperation
• Merge all the small files in HoodieBaseFileCompactionOperation (tentatively merge all files), and then further refine the merge algorithm (because the file records need to be read first and then merged and written, so the memory footprint and IO bandwidth will increase).
• Generate new files.
• Delete the previous small file.
• Commit compaction.

The entire Compaction process is roughly as follows.

Implementation Option#2: Clustering with snapshot isolation

Hoodie write client insert/upsert/bulk_insert will continue to function as before. Users can configure the small file soft limit to 0 to force new data to go into a new set of file groups. In addition, ‘clustering’ action is provided to collapse file groups based on the desired size limit. This clustering can run asynchronously or synchronously and will provide snapshot isolation between readers and writers. The exact steps taken for clustering are listed below for each table type.

COW Table timeline

In the example flow chart above, we show a partition state over time (t5 to t9).  The sequence of steps taken for writing are listed below.

  1. At t5, a partition in table has 5 file groups f0, f1, f2, f3, f4. For simplicity, assume that each file group is 100MB. So the total data in the partition is 500MB.
  2. A clustering operation is requested at t6. Similar to compaction, we create a “t6.clustering.requested” file in metadata with ‘ClusteringPlan’ that includes all the file groups touched by clustering action across all partitions. 
    1. Example contents:
    2. { partitionPath: {“datestr”}, oldfileGroups: [ {fileId: “f0”, time: “t0”}, { fileId: “f1”, time: “t1”}, ... ], newFileGroups: [“c1”, “c2”]  }
  3. Lets say maximum file size after clustering is configured to be 250MB. Clustering would re-distribute all the data in partition into two file groups: c1, c2. These file groups are ‘phantom’ and invisible to queries until clustering is complete at t8.
  4. Also, note that records in a file group can be split into multiple file groups. In this example, some records from the f4 file group go to both new file groups c1, c2.
  5. While the clustering is in progress (t6 through t8), any upserts that touch these file groups are rejected
  6. After writing new data files c1-t6.parquet and c2-t6.parquet, if a global index is configured, we add entries in the record level index for all the keys with the new location. The new index entries will not be visible to other writes because there is no commit associated yet.
  7. Finally, we create a commit metadata file ‘t6.commit’ that includes file groups modified by this commit (f0,f1,f2,f3,f4).
  8.  Note that file groups (f0 to f4) are not deleted from disk immediately. Cleaner would clean these files before archiving t6.commit. We also update all views to ignore all file groups mentioned in all the commit metadata files. So readers will not see duplicates.


Note that there is a possible race condition at step 5 if multiple writers are allowed. Another writer could have started upserts just before the ‘clustering.requested’ file is written. In the initial version, for simplicity, we assume there is only a single writer. The writer can either schedule clustering or run ingestion. The actual clustering operation can run asynchronously. When hoodie has multi-writer support, we can consider making scheduling asynchronous too. 


MOR Table timeline

This is very similar to the COW table. For MOR table, inserts can go into either parquet files or into log files. This approach will continue to support both modes. The output of clustering is always parquet format.  Also, compaction and clustering cannot run at the same time on the same file groups. Compaction also needs changes to ignore file groups that are already clustered.


Performance numbers

Time for reading metadata

Test is done to measure time take to write & read 'replace' metadata using code here. Here is the result:


Partitions

Total FileGroups replaced

(divide by column1 to get number of file groups per partition)

Serialization cost (millis)

Deserialization cost (millis)

Memory utilization

(HoodieReplaceMetadata object size + serialized byte[] size in memory )

1

300

55

41

60KB

1

3,000

55

42

570KB

1

30,000

93

120

5.7MB

1

300,000

103

130

57MB

10

300

53

32

60KB

10

3,000 

68

52

574KB

10

30,000 

87

104

5.7MB

10

300,000

97

114

57MB

We plan to store this metadata similar to clean metadata in avro files. After consolidated metadata is launched, we can come up with a plan to migrate this to leverage consolidated metadata(This will likely reduce memory required for cases where a partition has large number of files replaced)


Clustering steps


Overall, there are 2 parts to clustering

  1. Scheduling clustering: Create clustering plan
  2. Execute clustering: Process the plan. Create new files and replace old files.

Scheduling clustering

Following steps are followed to schedule clustering

  1. Identify files that are eligible for clustering
    1. Filter specific partitions (based on config to prioritize latest vs older partitions)
    2. Any files that have size > targetFileSize are not eligible for clustering
    3. Any files that have pending compaction/clustering scheduled are not eligible for clustering
    4. Any filegroups that have log files are not eligible for clustering  (We could remove this restriction at a later stage.)
  2. Group files that are eligible for clustering based on specific criteria. Each group is expected to have data size in multiples of ‘targetFileSize’.  Grouping is done as part of ‘strategy’ defined in the plan. We can provide 2 strategies
    1. Group files based on record key ranges. This is useful because key range is stored in a parquet footer and can be used for certain queries/updates.
    2. Groups files based on commit time. 
    3. Group files that have overlapping values for custom columns 
      1. As part of clustering, we want to sort data by column(s) in the schema (other than row_key). Among the files that are eligible for clustering, it is better to group files that have overlapping data for the custom columns.
        1. we have to read data to find this which is expensive with way ingestion works. We can consider storing value ranges as part of ingestion (we already do this for record_key). This requires more discussion. Probably, in the short term, we can focus on strategy 2a below (no support for sortBy custom columns).
        2. Example: say the target of clustering is to produce 1GB files. Partition initially has 8 * 512MB files. (After clustering, we expect data to be present in 4 * 1GB files.)
      2. Assume among 8 files, say only 2 files have overlapping data for the ‘sort column’, then these 2 files will be part of one group. Output of the group after clustering is one 1GB file. 
      3. Assume among 8 files, say 4 files have overlapping data for the ‘sort column’, then these 4 files will be part of one group. Output of the group after clustering is two 1GB files.
    4. Group random files
    5. We could put a cap on group size to improve parallelism and avoid shuffling large amounts of data 
  3. Filter groups based on specific criteria (akin to orderAndFilter in CompactionStrategy)
  4. Finally, the clustering plan is saved to the timeline. Structure of metadata is here: https://github.com/apache/hudi/blob/master/hudi-common/src/main/avro/HoodieClusteringPlan.avsc


In the ‘metrics’ element, we could store ‘min’ and ‘max’ for each column in the file for helping with debugging and operations.

Running clustering

  1. Read the clustering plan, look at the number of ‘clusteringGroups’. This gives parallelism.
  2. Create inflight clustering file
  3. For each group
    1.  Instantiate appropriate strategy class with strategyParams (example: sortColumns)
    2. Strategy class defines partitioner and we can use it to create buckets and write the data.
  4. Create replacecommit. Contents are in HoodieReplaceCommitMetadata 
    1. operationType is set to ‘clustering’.
    2.  We can extend the metadata and store additional fields to help track important information (strategy class can return this 'extra' metadata information)
      1. strategy used to combine files
      2. add additional metrics including range of values for each column in each file etc. 
      3. TODO: see if any additional metadata is needed?


Commands to schedule and run clustering

Quick start using Inline Clustering

import org.apache.hudi.QuickstartUtils._

import scala.collection.JavaConversions._

import org.apache.spark.sql.SaveMode._

import org.apache.hudi.DataSourceReadOptions._

import org.apache.hudi.DataSourceWriteOptions._

import org.apache.hudi.config.HoodieWriteConfig._


val tableName = "hudi_trips_cow"

val basePath = "/tmp/hudi_trips_cow"


val dataGen = new DataGenerator(Array("2020/03/11"))

val updates = convertToStringList(dataGen.generateInserts(10))

val df = spark.read.json(spark.sparkContext.parallelize(updates, 1));

df.write.format("org.apache.hudi").

        options(getQuickstartWriteConfigs).

          option(PRECOMBINE_FIELD_OPT_KEY, "ts").

          option(RECORDKEY_FIELD_OPT_KEY, "uuid").

          option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").

         option(TABLE_NAME, tableName).

          option("hoodie.parquet.small.file.limit", "0").

          option("hoodie.clustering.inline", "true").

         option("hoodie.clustering.inline.max.commits", "4").

          option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824").

          option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").

          option("hoodie.clustering.plan.strategy.sort.columns", ""). //optional, if sorting is needed as part of rewriting data

          mode(Append).

           save(basePath);


Setup for Async clustering

Clustering can be scheduled and run asynchronously using WriteClient APIs

  1. Schedule clustering API can be found here
  2. Execute clustering API can be found here


Some caveats: 

There is WIP to fix these limitations. But these issues are worth mentioning:

  1. This is alpha feature. Although, there is good unit test coverage, there may be some rough edges. Please report any issues.
  2. Better support for async clustering is coming soon.
  3. Clustering doesn't work with incremental timeline. So disable it by setting "hoodie.filesystem.view.incr.timeline.sync.enable: false"
  4. Incremental queries are not supported with clustering. Incremental queries consider all the data written by clustering as new rows.
  5. Clustering creates new type of commit "timestamp.replacecommit". There may be some places in code where we only read commits/deltacommits and miss replacecommits as part of reading valid commits in timeline. This can cause discrepency in some cases.
  6. Clean policy is different for 'replacecommit'. So there may be more versions retained leading to extra storage usage.  

Other concerns:

  • Can we do group files while Running clustering (as opposed to grouping during scheduling)?
    • To limit IO, scheduling filters certain file groups from clustering. If these file groups filtered have overlapping data with files selected, effectiveness of clustering will be limited. So I think grouping and filtering during scheduling has some benefits. 
  • Is the ClusteringPlan extensible enough for future use cases?
    • With the above approach, executing a clustering plan is basically dependent on two parameters: ‘targetFileSize’ and ‘strategyClass’. Users can define custom strategy class and support any other usecases that might come up.
  • Can we store strategyParams in hoodie.properties instead of storing in clustering plan?
    • This is reasonable if we don't expect strategyParams to change forever. If there are usecases to use different strategies by different partitions, this may not work. 


Rollout/Adoption Plan

  • No impact on the existing users because add new function

Test Plan

  • Unit tests
  • Integration tests
  • Test on the cluster for a larger dataset. 


  • No labels