Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Identify files that are eligible for clustering
    1. Filter specific partitions (based on config to prioritize latest vs older partitions)
    2. Any files that have size > targetFileSize are not eligible for clustering
    3. Any files that have pending compaction/clustering scheduled are not eligible for clustering
    4. Any filegroups that have log files are not eligible for clustering  (We could remove this restriction at a later stage.)
  2. Group files that are eligible for clustering based on specific criteria. Each group is expected to have data size in multiples of ‘targetFileSize’.  Grouping is done as part of ‘strategy’.
    1. If sort columns are specified,
      1. Among the files that are eligible for clustering, it is better to group files that have overlapping data for the columns specified together.
        1. we have to read data to find this which is expensive with way ingestion works. We can consider storing value ranges as part of ingestion (we already do this for record_key). This requires more discussion. Probably, in the short term, we can focus on strategy 2b below (no support for sortBy custom columns).
        2. Example: say the target of clustering is to produce 1GB files. Partition initially has 8 * 512MB files. (After clustering, we expect data to be present in 4 * 1GB files.)
      2. Assume among 8 files, say only 2 files have overlapping data for the ‘sort column’, then these 2 files will be part of one group. Output of the group after clustering is one 1GB file. 
      3. Assume among 8 files, say 4 files have overlapping data for the ‘sort column’, then these 4 files will be part of one group. Output of the group after clustering is two 1GB files.
  3. We could put a cap on group size to improve parallelism and avoid shuffling large amounts of data 
    1. If sort columns are not specified, we could consider grouping files based on other criteria: (All of these can be exposed as different strategies).
      1. Group files based on record key ranges. This is useful because key range is stored in a parquet footer and can be used for certain queries/updates.
      2. Groups files based on commit time. 
      3. Random grouping of files.
    2. We could put a cap on group size to improve parallelism and avoid shuffling large amounts of data 
  4. Filter groups based on specific criteria (akin to orderAndFilter in CompactionStrategy)
  5. Finally, the clustering plan is saved to the timeline. Structure of metadata

...

In the ‘metrics’ element, we could store ‘min’ and ‘max’ for each column in the file for helping with debugging and operations.

Running clustering

...

clustering

...

  1. Read the clustering plan, look at the number of ‘clusteringGroups’. This gives parallelism.For each group
  2. Create inflight clustering file
  3. For each group
    1. Create new 'CombineHandle' based on parameters (sortColumns for initial case)
    2.  If sort order is not specified, we could just combine the records and write to new buckets using existing logic similar to bulk_insert/insert.
    3. If sort order is specified, we need to add new logic (essentially do merge sort across files within group and write records to target file groups honoring targetFileSize ) and write the new file groups
  4. Create replacecommit. Contents are in HoodieReplaceCommitMetadata 
    1. operationType is set to ‘clustering’.
    2.  We can extend the metadata and add additional metrics including range of values for each column in each file etc.
    3. TODO: see if any additional metadata is needed?

...

  • Is the ClusteringPlan extensible enough for future use cases?
    • With the above approach, executing a clustering plan is basically dependent on two parameters: ‘targetFileSize’ and ‘sortColumns’. Based on these parameters, we create different partitioners/write data differently in new locations. Because this avro schema is extensible, we could add new fields and support any other usecases that might come up.
  • Can we store sortColumns in hoodie.properties instead of storing in clustering plan?
    • This is reasonable if we don't expect sortColumns to change forever. If the data pattern changes for any reason or if there are usecases to support sorting different partitions by different columns, this may not work. 



Rollout/Adoption Plan

  • No impact on the existing users because add new function

...