Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Identify files that are eligible for clustering
    1. Filter specific partitions (based on config to prioritize latest vs older partitions)
    2. Any files that have size > targetFileSize are not eligible for clustering
    3. Any files that have pending compaction/clustering scheduled are not eligible for clustering
    4. Any filegroups that have log files are not eligible for clustering  (We could remove this restriction at a later stage.)
  2. Group files that are eligible for clustering based on specific criteria. Each group is expected to have data size in multiples of ‘targetFileSize’.  Grouping is done as part of ‘strategy’.
    1. If sort columns are specified,
      1. Among the files that are eligible for clustering, it is better to group files that have overlapping data for the columns specified together.
      2. Example: say the target of clustering is to produce 1GB files. Partition initially has 8 * 512MB files. (After clustering, we expect data to be present in 4 * 1GB files.)
        1. Assume among 8 files, say only 2 files have overlapping data for the ‘sort column’, then these 2 files will be part of one group. Output of the group after clustering is one 1GB file. 
        2. Assume among 8 files, say 4 files have overlapping data for the ‘sort column’, then these 4 files will be part of one group. Output of the group after clustering is two 1GB files.
      3. We could put a cap on group size to improve parallelism and avoid shuffling large amounts of data 
    2. If sort columns are not specified, we could consider grouping files based on other criteria: (All of these can be exposed as different strategies).
      1. Group files based on record key ranges. This is useful because key range is stored in a parquet footer and can be used for certain queries/updates.
      2. Groups files based on commit time. 
      3. Random grouping of files.
  3. Filter groups based on specific criteria (akin to orderAndFilter in CompactionStrategy)
  4. Finally, the clustering plan is saved to the timeline. Structure of metadata


{

  "namespace":"org.apache.hudi.avro.model",

  "type":"record",

  "name":"HoodieClusteringPlan",

  "fields":[

    {

        "name":"clusteringGroups",

        "type":["null", {

           "type":"array",

           "items":{

              "name":"HoodieClusteringGroup",

              "type":"record",

              "fields":[

                 {

                    "name":"fileIds",

                    "type":["null", {

                       "type":"array",

                       "items":"string"

                    }],

                    "default": null

                 },

                 {

                    "name":"partitionPath",

                    "type":["null","string"],

                    "default": null

                 },

                 {

                    "name":"metrics",

                    "type":["null", {

                       "type":"map",

                       "values":"double"

                    }],

                    "default": null

                 }

              ]

           }

       }],

      "default": null

   },

   {

      "name":"targetFileSize",

      "type":["long", "null"],

      "default": 1073741824

   },

   {

      "name":"sortColumns",

      "type":["null", {

         "type":"array",

         "items":"string"

      }],

      "default": null

   },

   {

      "name":"extraMetadata",

      "type":["null", {

         "type":"map",

         "values":"string"

      }],

      "default": null

   },

   {

      "name":"version",

      "type":["int", "null"],

      "default": 1

   }

 ]

}


In the ‘metrics’ element, we could store ‘min’ and ‘max’ for each column in the file for helping with debugging and operations.

...

  1. Create inflight clustering file
  2. Read the clustering plan, look at the number of ‘clusteringGroups’. This gives parallelism.
  3. For each group
    1.  If sort order is not specified, we could just combine the records and write to new buckets using existing logic similar to bulk_insert/insert.
    2. If sort order is specified, we need to add new logic (essentially do merge sort across files within group and write records to target file groups honoring targetFileSize ) and write the new file groups
  4. Create replacecommit. Contents are in HoodieReplaceCommitMetadata 
    1. operationType is set to ‘clustering’.
    2.  We can extend the metadata and add additional metrics including time takenrange of values for each column in each file etc.
    3. TODO: see if any additional metadata is needed?

...