Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Info
iconfalse

import org.apache.hudi.QuickstartUtils._

import scala.collection.JavaConversions._

import org.apache.spark.sql.SaveMode._

import org.apache.hudi.DataSourceReadOptions._

import org.apache.hudi.DataSourceWriteOptions._

import org.apache.hudi.config.HoodieWriteConfig._


val tableName = "hudi_trips_cow"

val basePath = "/tmp/hudi_trips_cow"


val dataGen = new DataGenerator(Array("2020/03/11"))

val updates = convertToStringList(dataGen.generateInserts(10))

val df = spark.read.json(spark.sparkContext.parallelize(updates, 1));

df.write.format("org.apache.hudi").

        options(getQuickstartWriteConfigs).

          option(PRECOMBINE_FIELD_OPT_KEY, "ts").

          option(RECORDKEY_FIELD_OPT_KEY, "uuid").

          option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").

         option(TABLE_NAME, tableName).

          option("hoodie.parquet.small.file.limit", "0").

          option("hoodie.clustering.inline", "true").

         option("hoodie.clustering.inline.max.commits", "4").

          option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824").

          option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").

          option("hoodie.clustering.plan.strategy.sort.columns", ""). //optional, if sorting is needed as part of rewriting data

          mode(Append).

           save(basePath);


Setup for Async clustering Job

Clustering can be scheduled and run asynchronously using WriteClient APIs

...

a SparkJob. The utilities spark job can be found here

...


  1. prepare the clusering config file 

2. Schedule clustering

Info
iconfalse

bin/spark-submit \

--master local[4] \

--class org.apache.hudi.utilities.HoodieClusteringJob \

/Users/liwei/work-space/dla/opensource/incubator-hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.8.0-SNAPSHOT.jar \

--schedule \

--base-path /Users/liwei/work-space/spark/spark-2.4.6-bin-hadoop2.7/hudi_table_with_small_filegroups3/dest \

--table-name hudi_table_with_small_filegroups3_schedule_clustering \

--props /Users/liwei/work-space/spark/spark-2.4.6-bin-hadoop2.7/hudi_table_with_small_filegroups3/config/clusteringjob.properties \

--spark-memory 1g

      you can find the schedule clustering instant time in the spark logs. With the log prefix "The schedule instant time is" ,and the schedule clustering instant time is 20210122190240

Image Added

  1. Execute clustering API can be found here
Info
iconfalse

bin/spark-submit \

--master local[4] \

--class org.apache.hudi.utilities.HoodieClusteringJob \

/Users/liwei/work-space/dla/opensource/incubator-hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.8.0-SNAPSHOT.jar \

--schedule \

--base-path /Users/liwei/work-space/spark/spark-2.4.6-bin-hadoop2.7/hudi_table_with_small_filegroups3/dest \

--table-name hudi_table_with_small_filegroups3_schedule_clustering \

--props /Users/liwei/work-space/spark/spark-2.4.6-bin-hadoop2.7/hudi_table_with_small_filegroups3/config/clusteringjob.properties \

--spark-memory 1g

Some caveats

There is WIP to fix these limitations. But these issues are worth mentioning:

...