Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We plan to store this metadata similar to clean metadata in avro files. After consolidated metadata is launched, we can come up with a plan to migrate this to leverage consolidated metadata(This will likely reduce memory required for cases where a partition has large number of files replaced)


Clustering steps


Overall, there are 2 parts to clustering

  1. Scheduling clustering: Create clustering plan
  2. Execute clustering: Process the plan. Create new files and replace old files.

Scheduling clustering

Following steps are followed to schedule clustering

  1. Identify files that are eligible for clustering
    1. Filter specific partitions (based on config to prioritize latest vs older partitions)
    2. Any files that have size > targetFileSize are not eligible for clustering
    3. Any files that have pending compaction/clustering scheduled are not eligible for clustering
    4. Any filegroups that have log files are not eligible for clustering  (We could remove this restriction at a later stage.)
  2. Group files that are eligible for clustering based on specific criteria. Each group is expected to have data size in multiples of ‘targetFileSize’.  Grouping is done as part of ‘strategy’.
    1. If sort columns are specified,
      1. Among the files that are eligible for clustering, it is better to group files that have overlapping data for the columns specified together.
      2. Example: say the target of clustering is to produce 1GB files. Partition initially has 8 * 512MB files. (After clustering, we expect data to be present in 4 * 1GB files.)
        1. Assume among 8 files, say only 2 files have overlapping data for the ‘sort column’, then these 2 files will be part of one group. Output of the group after clustering is one 1GB file. 
        2. Assume among 8 files, say 4 files have overlapping data for the ‘sort column’, then these 4 files will be part of one group. Output of the group after clustering is two 1GB files.
      3. We could put a cap on group size to avoid shuffling large amounts of data 
    2. If sort columns are not specified, we could consider grouping files based on other criteria: (All of these can be exposed as different strategies).
      1. Group files based on record key ranges. This is useful because key range is stored in a parquet footer and can be used for certain queries/updates.
      2. Groups files based on commit time. 
      3. Random grouping of files.
  3. Filter groups based on specific criteria (akin to orderAndFilter in CompactionStrategy)
  4. Finally, the clustering plan is saved to the timeline. Structure of metadata


{

  "namespace":"org.apache.hudi.avro.model",

  "type":"record",

  "name":"HoodieClusteringPlan",

  "fields":[

    {

        "name":"clusteringGroups",

        "type":["null", {

           "type":"array",

           "items":{

              "name":"HoodieClusteringGroup",

              "type":"record",

              "fields":[

                 {

                    "name":"fileIds",

                    "type":["null", {

                       "type":"array",

                       "items":"string"

                    }],

                    "default": null

                 },

                 {

                    "name":"partitionPath",

                    "type":["null","string"],

                    "default": null

                 },

                 {

                    "name":"metrics",

                    "type":["null", {

                       "type":"map",

                       "values":"double"

                    }],

                    "default": null

                 }

              ]

           }

       }],

      "default": null

   },

   {

      "name":"targetFileSize",

      "type":["long", "null"],

      "default": 1073741824

   },

   {

      "name":"sortColumns",

      "type":["null", {

         "type":"array",

         "items":"string"

      }],

      "default": null

   },

   {

      "name":"extraMetadata",

      "type":["null", {

         "type":"map",

         "values":"string"

      }],

      "default": null

   },

   {

      "name":"version",

      "type":["int", "null"],

      "default": 1

   }

 ]

}


In the ‘metrics’ element, we could store ‘min’ and ‘max’ for each column in the file for helping with debugging and operations.

Running clustering


  1. Create inflight clustering file
  2. Read the clustering plan, look at the number of ‘clusteringGroups’. This gives parallelism.
  3. For each group
    1.  If sort order is not specified, we could just combine the records and write to new buckets using existing logic similar to bulk_insert/insert.
    2. If sort order is specified, we need to add new logic (essentially do merge sort across files within group and write records to target file groups ) and write the new file groups
  4. Create replacecommit. Contents are in HoodieReplaceCommitMetadata 
    1. operationType is set to ‘clustering’.
    2.  We can extend the metadata and add additional metrics including time taken
    3. TODO: see if any additional metadata is needed?

Other concerns:

  • Can we do group files while Running clustering ( as opposed to grouping during scheduling)?
    • To limit IO, scheduling filters certain file groups from clustering. If these file groups filtered have overlapping data with files selected, effectiveness of clustering will be limited. So I think grouping and filtering during scheduling has some benefits. 
  • Is the ClusteringPlan extensible enough for future use cases?
    • With the above approach, executing a clustering plan is basically dependent on two parameters: ‘targetFileSize’ and ‘sortColumns’. Based on these parameters, we create different partitioners/write data differently in new locations. Because this avro schema is extensible, we could add new fields and support any other usecases that might come up.



Rollout/Adoption Plan

  • No impact on the existing users because add new function

...