Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Info
iconfalse

import org.apache.hudi.QuickstartUtils._

import scala.collection.JavaConversions._

import org.apache.spark.sql.SaveMode._

import org.apache.hudi.DataSourceReadOptions._

import org.apache.hudi.DataSourceWriteOptions._

import org.apache.hudi.config.HoodieWriteConfig._


val tableName = "hudi_trips_cow"

val basePath = "/tmp/hudi_trips_cow"


val dataGen = new DataGenerator(Array("2020/03/11"))

val updates = convertToStringList(dataGen.generateInserts(10))

val df = spark.read.json(spark.sparkContext.parallelize(updates, 1));

df.write.format("org.apache.hudi").

        options(getQuickstartWriteConfigs).

          option(PRECOMBINE_FIELD_OPT_KEY, "ts").

          option(RECORDKEY_FIELD_OPT_KEY, "uuid").

          option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").

         option(TABLE_NAME, tableName).

          option("hoodie.parquet.small.file.limit", "0").

          option("hoodie.clustering.inline", "true").

         option("hoodie.clustering.inline.max.commits", "4").

          option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824").

          option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").

          option("hoodie.clustering.plan.strategy.sort.columns", ""). //optional, if sorting is needed as part of rewriting data

          mode(Append).

           save(basePath);


Setup for Async clustering Job

Clustering can be scheduled and run asynchronously using WriteClient APIs

  1. Schedule clustering API can be found here
  2. Execute clustering API can be found here

a SparkJob. The utilities spark job can be found here


  1. prepare the clusering config file: 
Info
iconfalse

cat /Users/liwei/work-space/spark/spark-2.4.6-bin-hadoop2.7/hudi_table_with_small_filegroups3/config/clusteringjob.properties

hoodie.clustering.inline.max.commits=2

     2. Schedule clustering

Info
iconfalse

bin/spark-submit \

--master local[4] \

--class org.apache.hudi.utilities.HoodieClusteringJob \

/Users/liwei/work-space/dla/opensource/incubator-hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.8.0-SNAPSHOT.jar \

--schedule \

--base-path /Users/liwei/work-space/spark/spark-2.4.6-bin-hadoop2.7/hudi_table_with_small_filegroups3/dest \

--table-name hudi_table_with_small_filegroups3_schedule_clustering \

--props /Users/liwei/work-space/spark/spark-2.4.6-bin-hadoop2.7/hudi_table_with_small_filegroups3/config/clusteringjob.properties \

--spark-memory 1g

      you can find the schedule clustering instant time in the spark logs. With the log prefix "The schedule instant time is" ,and the schedule clustering instant time is 20210122190240

Image Added

     3. use the schedule instant time "20210122190240" to run clustering

Info
iconfalse

bin/spark-submit \

--master local[4] \

--class org.apache.hudi.utilities.HoodieClusteringJob \

/Users/liwei/work-space/dla/opensource/incubator-hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.8.0-SNAPSHOT.jar \

--base-path /Users/liwei/work-space/spark/spark-2.4.6-bin-hadoop2.7/hudi_table_with_small_filegroups3/dest \

--instant-time 20210122190240 \

--table-name hudi_table_with_small_filegroups_clustering \

--props /Users/liwei/work-space/spark/spark-2.4.6-bin-hadoop2.7/hudi_table_with_small_filegroups3/config/clusteringjob.properties \

--spark-memory 1g

Some caveats

There is WIP to fix these limitations. But these issues are worth mentioning:

...

  1. Convert dataset to hoodie format

    Code Block
    languagescala
    val df = spark.read.option("sep", "\t").option("header", "true").csv(amznReviewsRawDataPath)
    
    val tableName = "reviews"
    
    df.write.format("org.apache.hudi").
     options(getQuickstartWriteConfigs). 
     option(PRECOMBINE_FIELD_OPT_KEY, "customer_id").
      option(RECORDKEY_FIELD_OPT_KEY, "review_id").
      option(PARTITIONPATH_FIELD_OPT_KEY, "marketplace").
      option(OPERATION_OPT_KEY, "insert").
      option(TABLE_NAME, tableName).
      mode(Overwrite).
      save(amznReviewHudiPath);
    
    //creates ~500 data files in one partition


  2. Evaluate query time (No Clustering)

    Code Block
    languagesql
    titlequery takes ~10 seconds
    scala> spark.time(spark.sql("select sum(total_votes), product_category from amzn_reviews where review_date > '2007' and review_date < '2009' group by product_category").collect())
    Time taken: 10018 ms     
    
    

    Stage details  Query Plan

  3. Perform clustering

    Code Block
    languagescala
    // create writeClient with overriding following write config:
    //"hoodie.clustering.plan.strategy.sort.columns" -> "product_category,review_date"
    //"hoodie.clustering.plan.strategy.max.bytes.per.group" -> "107374182400"
    //"hoodie.clustering.plan.strategy.max.num.groups" -> "1"
    
    val clusteringInstant = writeClient.scheduleClustering(Option.empty())
    val metadata = writeClient.cluster(clusteringInstant.get, true)
    
    //creates ~350 data files and replaces existing ~500 data files one partition


    Info
    titleVerify replacecommit is created

    $ hadoop fs -ls $amznReviewHudiPath/.hoodie/

    Found 10 items

    drwxr-xr-x   - satish           0 2021-01-20 18:38 $amznReviewHudiPath/.hoodie/.aux

    drwxr-xr-x   - satish           0 2021-01-21 00:49 $amznReviewHudiPath/.hoodie/.temp

    -rw-r--r--   3 satish      445621 2021-01-20 18:41 $amznReviewHudiPath/.hoodie/20210120183848.commit

    -rw-r--r--   3 satish           0 2021-01-20 18:39 $amznReviewHudiPath/.hoodie/20210120183848.commit.requested

    -rw-r--r--   3 satish         979 2021-01-20 18:40 $amznReviewHudiPath/.hoodie/20210120183848.inflight

    -rw-r--r--   3 satish      493983 2021-01-21 00:51 $amznReviewHudiPath/.hoodie/20210121004731.replacecommit

    -rw-r--r--   3 satish           0 2021-01-21 00:47 $amznReviewHudiPath/.hoodie/20210121004731.replacecommit.inflight

    -rw-r--r--   3 satish      131495 2021-01-21 00:47 $amznReviewHudiPath/.hoodie/20210121004731.replacecommit.requested

    drwxr-xr-x   - satish           0 2021-01-20 18:38 $amznReviewHudiPath/.hoodie/archived

    -rw-r--r--   3 satish         228 2021-01-20 18:38 $amznReviewHudiPath/.hoodie/hoodie.properties



  4. Evaluate query time (with Clustering). Note that same query in step 2 that took 10 seconds now runs in 4 seconds 

    Code Block
    languagesql
    titlequery takes ~4 seconds
    scala> spark.time(spark.sql("select sum(total_votes), product_category from amzn_reviews where review_date > '2007' and review_date < '2009' group by product_category").collect())
    Time taken: 4099 ms      

    Stage details Query Plan

...

In summary, rewriting the data using clustering can speed up query runtime by ~60%. ~60% 

Rollout/Adoption Plan

  • No impact on the existing users because add new function. Note that if you are already using Hudi, It is important to move your readers first to 0.7 release version before upgrading writers. This is because clustering creates a new type of commit and its important query engines recognize this new commit type.

...