Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Identify files that are eligible for clustering
    1. Filter specific partitions (based on config to prioritize latest vs older partitions)
    2. Any files that have size > targetFileSize are not eligible for clustering
    3. Any files that have pending compaction/clustering scheduled are not eligible for clustering
    4. Any filegroups that have log files are not eligible for clustering  (We could remove this restriction at a later stage.)
  2. Group files that are eligible for clustering based on specific criteria. Each group is expected to have data size in multiples of ‘targetFileSize’.  Grouping is done as part of ‘strategy’ defined in the plan. We can provide 2 strategies
    1. Group files based on record key ranges. This is useful because key range is stored in a parquet footer and can be used for certain queries/updates.
    2. Groups files based on commit time. 
    3. Group files that have overlapping values for custom columns 
      1. As part of clustering, we want to sort data by column(s) in the schema (other than row_key). Among the files that are eligible for clustering, it is better to group files that have overlapping data for the custom columns
    4. If sort columns are specified,
      1. Among the files that are eligible for clustering, it is better to group files that have overlapping data for the columns specified together.
        1. we have to read data to find this which is expensive with way ingestion works. We can consider storing value ranges as part of ingestion (we already do this for record_key). This requires more discussion. Probably, in the short term, we can focus on strategy 2b 2a below (no support for sortBy custom columns).
        2. Example: say the target of clustering is to produce 1GB files. Partition initially has 8 * 512MB files. (After clustering, we expect data to be present in 4 * 1GB files.)
      2. Assume among 8 files, say only 2 files have overlapping data for the ‘sort column’, then these 2 files will be part of one group. Output of the group after clustering is one 1GB file. 
      3. Assume among 8 files, say 4 files have overlapping data for the ‘sort column’, then these 4 files will be part of one group. Output of the group after clustering is two 1GB files.
    5. If sort columns are not specified, we could consider grouping files based on other criteria: (All of these can be exposed as different strategies).
      1. Group files based on record key ranges. This is useful because key range is stored in a parquet footer and can be used for certain queries/updates.
      2. Groups files based on commit time. 
      3. Random grouping of files.
    6. Group random files
    7. We could We could put a cap on group size to improve parallelism and avoid shuffling large amounts of data 
  3. Filter groups based on specific criteria (akin to orderAndFilter in CompactionStrategy)
  4. Finally, the clustering plan is saved to the timeline. Structure of metadata


{
  
   "namespace":"org.apache.hudi.avro.model",
  
   "type":"record",
  
   "name":"HoodieClusteringPlan",
  
   "fields":[
    
     {
        
         "name":"clusteringGroups",
        
         "type":["null", {
           
            "type":"array",
           
            "items":{
              
               "name":"HoodieClusteringGroup",
              
               "type":"record",
              
               "fields":[

                 {

                    
                  {
"name":"fileIds",
                    
                     "type":["null",
{

                       "type":"array",

                       "items":"string"

                    }],

                    "default": null

                 },

                 {

                    "name":"partitionPath",

                    "type":["null","string"],

                    "default": null

                 },

                 {

                    "name":"metrics",

                    "type":["null", {

                       "type":"map",

                       "values":"double"

                    }],

                    "default": null

                 }

              ]

           }

       }],

      "default": null

   },

   {

      "name":"targetFileSize",

      "type":["long", "null"],

      "default": 1073741824

   },

   {

      "name":"sortColumns",

      "type":["null", {

         "type":"array",

         "items":"string"

      }],

      "default": null

   },

   {

      
 {
"type":"array",
"items":"string"
}],
"default": null
},
{
"name":"partitionPath",
"type":["null","string"],
"default": null
},
{
"name":"metrics",
"type":["null", {
"type":"map",
"values":"double"
}],
"default": null
}
]
}
}],
"default": null
},
{
"name":"targetFileSize",
"type":["long", "null"],
"default": 1073741824
},
{
"name":"strategy",
"type":"record",
"fields":[
{
"name":"strategyClassName", /* have to be subclass of ClusteringStrategy interface defined in hudi. ClusteringStrategy class include methods like getPartitioner */
"type":["null","string"],
"default": null
},
{
"name":"strategyParams", /* Parameters could be different for different strategies. example, if sorting is needed for the strategy, parameters can contain sortColumns */
"type":["null", {
"type":"map",
"values":"string"
}],
"default": null
}
]
},
{
"name":"extraMetadata",
      
       "type":["null", {
         
          "type":"map
",         "
",
"values":"string"
      
       }],
      
       "default": null
   
    },
   
    {
      
       "name":"version",
      
       "type":["int", "null"],
      
       "default": 1
   
    }
 
  ]
}


In the ‘metrics’ element, we could store ‘min’ and ‘max’ for each column in the file for helping with debugging and operations.

...

  1. Read the clustering plan, look at the number of ‘clusteringGroups’. This gives parallelism.
  2. Create inflight clustering file
  3. For each groupgroup
    1.  Instantiate appropriate strategy class with strategyParams (example: sortColumns)
    2. Strategy class defines partitioner and we can use it to create buckets and write the data.
    3. Create new 'CombineHandle' based on parameters (sortColumns for initial case)
    4.  If sort order is not specified, we could just combine the records and write to new buckets using existing logic similar to bulk_insert/insert.
    5. If sort order is specified, we need to add new logic (essentially do merge sort across files within group and write records to target file groups honoring targetFileSize ) and write the new file groups
  4. Create replacecommit. Contents are in HoodieReplaceCommitMetadata 
    1. operationType is set to ‘clustering’.
    2.  We can extend the metadata and store additional fields to help track important information (strategy class can return this 'extra' metadata information) We can extend the metadata and
      1. strategy used to combine files
      1. add additional metrics including range of values for each column in each file etc. 
      2. TODO: see if any additional metadata is needed?

Other concerns:

  • Can we do group files while Running clustering (as opposed to grouping during scheduling)?
    • To limit IO, scheduling filters certain file groups from clustering. If these file groups filtered have overlapping data with files selected, effectiveness of clustering will be limited. So I think grouping and filtering during scheduling has some benefits. 
  • Is the ClusteringPlan extensible enough for future use cases?
    • With the above approach, executing a clustering plan is basically dependent on two parameters: ‘targetFileSize’ and ‘sortColumns’. Based on these parameters, we create different partitioners/write data differently in new locations. Because this avro schema is extensible, we could add new fields ‘strategyClass’. Users can define custom strategy class and support any other usecases that might come up.
  • Can we store sortColumns store strategyParams in hoodie.properties instead of storing in clustering plan?
    • This is reasonable if we don't expect sortColumns expect strategyParams to change forever. If the data pattern changes for any reason or if there are usecases to support sorting use different partitions strategies by different columnspartitions, this may not work. 


Rollout/Adoption Plan

...