Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Convert dataset to hoodie format

    Code Block
    languagescala
    val df = spark.read.option("sep", "\t").option("header", "true").csv(amznReviewsRawDataPath)
    
    val tableName = "reviews"
    
    df.write.format("org.apache.hudi").
     options(getQuickstartWriteConfigs). 
     option(PRECOMBINE_FIELD_OPT_KEY, "customer_id").
      option(RECORDKEY_FIELD_OPT_KEY, "review_id").
      option(PARTITIONPATH_FIELD_OPT_KEY, "marketplace").
      option(OPERATION_OPT_KEY, "insert").
      option(TABLE_NAME, tableName).
      mode(Overwrite).
      save(amznReviewHudiPath);
    
    //creates ~500 data files in one partition


  2. Evaluate query time (No Clustering)

    Code Block
    languagesql
    titlequery takes ~10 seconds
    scala> spark.time(spark.sql("select sum(total_votes), product_category from amzn_reviews where review_date > '2007' and review_date < '2009' group by product_category").collect())
    Time taken: 10018 ms     
    
    

    Stage details  Query Plan

  3. Perform clustering

    Code Block
    languagescala
    // create writeClient with overriding following write config:
    //"hoodie.clustering.plan.strategy.sort.columns" -> "product_category,review_date"
    //"hoodie.clustering.plan.strategy.max.bytes.per.group" -> "107374182400"
    //"hoodie.clustering.plan.strategy.max.num.groups" -> "1"
    
    val clusteringInstant = writeClient.scheduleClustering(Option.empty())
    val metadata = writeClient.cluster(clusteringInstant.get, true)
    
    //creates ~350 data files and replaces existing ~500 data files one partition


    Info
    titleVerify replacecommit is created

    $ hadoop fs -ls $amznReviewHudiPath/.hoodie/

    Found 10 items

    drwxr-xr-x   - satish           0 2021-01-20 18:38 $amznReviewHudiPath/.hoodie/.aux

    drwxr-xr-x   - satish           0 2021-01-21 00:49 $amznReviewHudiPath/.hoodie/.temp

    -rw-r--r--   3 satish      445621 2021-01-20 18:41 $amznReviewHudiPath/.hoodie/20210120183848.commit

    -rw-r--r--   3 satish           0 2021-01-20 18:39 $amznReviewHudiPath/.hoodie/20210120183848.commit.requested

    -rw-r--r--   3 satish         979 2021-01-20 18:40 $amznReviewHudiPath/.hoodie/20210120183848.inflight

    -rw-r--r--   3 satish      493983 2021-01-21 00:51 $amznReviewHudiPath/.hoodie/20210121004731.replacecommit

    -rw-r--r--   3 satish           0 2021-01-21 00:47 $amznReviewHudiPath/.hoodie/20210121004731.replacecommit.inflight

    -rw-r--r--   3 satish      131495 2021-01-21 00:47 $amznReviewHudiPath/.hoodie/20210121004731.replacecommit.requested

    drwxr-xr-x   - satish           0 2021-01-20 18:38 $amznReviewHudiPath/.hoodie/archived

    -rw-r--r--   3 satish         228 2021-01-20 18:38 $amznReviewHudiPath/.hoodie/hoodie.properties



  4. Evaluate query time (with Clustering). Note that same query in step 2 that took 10 seconds now runs in 4 seconds 

    Code Block
    languagesql
    titlequery takes ~4 seconds
    scala> spark.time(spark.sql("select sum(total_votes), product_category from amzn_reviews where review_date > '2007' and review_date < '2009' group by product_category").collect())
    Time taken: 4099 ms      

    Stage details Query Plan

...

In summary, rewriting the data using clustering can speed up query runtime by ~60%. ~60% 

Rollout/Adoption Plan

  • No impact on the existing users because add new function. Note that if you are already using Hudi, It is important to move your readers first to 0.7 release version before upgrading writers. This is because clustering creates a new type of commit and its important query engines recognize this new commit type.

...