Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. This is alpha feature. Although, there is good unit test coverage, there may be some rough edges. Please report any issues.
  2. Better support for async clustering is coming soon.
  3. Clustering doesn't work with incremental timeline. So disable it by setting "hoodie.filesystem.view.incr.timeline.sync.enable: false"
  4. Incremental queries are not supported with clustering. Incremental queries consider all the data written by clustering as new rows.
  5. Clustering creates new type of commit "timestamp.replacecommit". There may be some places in code where we only read commits/deltacommits and miss replacecommits as part of reading valid commits in timeline. This can cause discrepancy in some cases.
  6. Clean policy is different for 'replacecommit'. So there may be more versions retained leading to extra storage usage.  

Performance Evaluation

Dataset: https://s3.amazonaws.com/amazon-reviews-pds/readme.html

Query: select sum(total_votes), product_category from amzn_reviews where review_date > '2007' and review_date < '2009' group by product_category

  1. Convert dataset to hoodie format

    Code Block
    languagescala
    val df = spark.read.option("sep", "\t").option("header", "true").csv(amznReviewsRawDataPath)
    
    val tableName = "reviews"
    
    df.write.format("org.apache.hudi").
     options(getQuickstartWriteConfigs). 
     option(PRECOMBINE_FIELD_OPT_KEY, "customer_id").
      option(RECORDKEY_FIELD_OPT_KEY, "review_id").
      option(PARTITIONPATH_FIELD_OPT_KEY, "marketplace").
      option(OPERATION_OPT_KEY, "insert").
      option(TABLE_NAME, tableName).
      mode(Overwrite).
      save(amznReviewHudiPath);
    
    //creates ~500 data files in one partition


  2. Evaluate query time (No Clustering)

    Code Block
    languagesql
    titlequery takes ~10 seconds
    scala> spark.time(spark.sql("select sum(total_votes), product_category from amzn_reviews where review_date > '2007' and review_date < '2009' group by product_category").collect())
    Time taken: 10018 ms     
    
    

    Stage detailsImage Added  Query PlanImage Added

  3. Perform clustering

    Code Block
    languagescala
    // create writeClient with overriding following write config:
    //"hoodie.clustering.plan.strategy.sort.columns" -> "product_category,review_date"
    //"hoodie.clustering.plan.strategy.max.bytes.per.group" -> "107374182400"
    //"hoodie.clustering.plan.strategy.max.num.groups" -> "1"
    
    val clusteringInstant = writeClient.scheduleClustering(Option.empty())
    val metadata = writeClient.cluster(clusteringInstant.get, true)
    
    //creates ~350 data files and replaces existing ~500 data files one partition


    Info
    titleVerify replacecommit is created

    $ hadoop fs -ls $amznReviewHudiPath/.hoodie/

    Found 10 items

    drwxr-xr-x   - satish           0 2021-01-20 18:38 $amznReviewHudiPath/.hoodie/.aux

    drwxr-xr-x   - satish           0 2021-01-21 00:49 $amznReviewHudiPath/.hoodie/.temp

    -rw-r--r--   3 satish      445621 2021-01-20 18:41 $amznReviewHudiPath/.hoodie/20210120183848.commit

    -rw-r--r--   3 satish           0 2021-01-20 18:39 $amznReviewHudiPath/.hoodie/20210120183848.commit.requested

    -rw-r--r--   3 satish         979 2021-01-20 18:40 $amznReviewHudiPath/.hoodie/20210120183848.inflight

    -rw-r--r--   3 satish      493983 2021-01-21 00:51 $amznReviewHudiPath/.hoodie/20210121004731.replacecommit

    -rw-r--r--   3 satish           0 2021-01-21 00:47 $amznReviewHudiPath/.hoodie/20210121004731.replacecommit.inflight

    -rw-r--r--   3 satish      131495 2021-01-21 00:47 $amznReviewHudiPath/.hoodie/20210121004731.replacecommit.requested

    drwxr-xr-x   - satish           0 2021-01-20 18:38 $amznReviewHudiPath/.hoodie/archived

    -rw-r--r--   3 satish         228 2021-01-20 18:38 $amznReviewHudiPath/.hoodie/hoodie.properties



  4. Evaluate query time (with Clustering)

    Code Block
    languagesql
    titlequery takes ~4 seconds
    scala> spark.time(spark.sql("select sum(total_votes), product_category from amzn_reviews where review_date > '2007' and review_date < '2009' group by product_category").collect())
    Time taken: 4099 ms      

    Stage detailsImage Added Query PlanImage Added

Summary

In summary, rewriting the data using clustering can speed up query runtime by ~60%. 

Rollout/Adoption Plan

  • No impact on the existing users because add new function. Note that if you are already using Hudi, It is important to move your readers first to 0.7 release version before upgrading writers. This is because clustering creates a new type of commit and its important query engines recognize this new commit type.

...