Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Info
iconfalse

import org.apache.hudi.QuickstartUtils._

import scala.collection.JavaConversions._

import org.apache.spark.sql.SaveMode._

import org.apache.hudi.DataSourceReadOptions._

import org.apache.hudi.DataSourceWriteOptions._

import org.apache.hudi.config.HoodieWriteConfig._


val tableName = "hudi_trips_cow"

val basePath = "/tmp/hudi_trips_cow"


val dataGen = new DataGenerator(Array("2020/03/11"))

val updates = convertToStringList(dataGen.generateInserts(10))

val df = spark.read.json(spark.sparkContext.parallelize(updates, 1));

df.write.format("org.apache.hudi").

        options(getQuickstartWriteConfigs).

          option(PRECOMBINE_FIELD_OPT_KEY, "ts").

          option(RECORDKEY_FIELD_OPT_KEY, "uuid").

          option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").

         option(TABLE_NAME, tableName).

          option("hoodie.parquet.small.file.limit", "0").

          option("hoodie.clustering.inline", "true").

         option("hoodie.clustering.inline.max.commits", "4").

          option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824").

          option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").

          option("hoodie.clustering.plan.strategy.sort.columns", ""). //optional, if sorting is needed as part of rewriting data

          mode(Append).

           save(basePath);


Setup for Async clustering Job

Clustering can be scheduled and run asynchronously using WriteClient APIs

...

a SparkJob. The utilities spark job can be found here


  1. prepare the clusering config file: 
Info
iconfalse

cat /Users/liwei/work-space/spark/spark-2.4.6-bin-hadoop2.7/hudi_table_with_small_filegroups3/config/clusteringjob.properties

hoodie.clustering.inline.max.commits=2

     2. Schedule clustering

Info
iconfalse

bin/spark-submit \

--master local[4] \

--class org.apache.hudi.utilities.HoodieClusteringJob \

/Users/liwei/work-space/dla/opensource/incubator-hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.8.0-SNAPSHOT.jar \

--schedule \

--base-path /Users/liwei/work-space/spark/spark-2.4.6-bin-hadoop2.7/hudi_table_with_small_filegroups3/dest \

--table-name hudi_table_with_small_filegroups3_schedule_clustering \

--props /Users/liwei/work-space/spark/spark-2.4.6-bin-hadoop2.7/hudi_table_with_small_filegroups3/config/clusteringjob.properties \

--spark-memory 1g

      you can find the schedule clustering instant time in the spark logs. With the log prefix "The schedule instant time is" ,and the schedule clustering instant time is 20210122190240

Image Added

     3. use the schedule instant time "20210122190240" to run clustering

Info
iconfalse

bin/spark-submit \

--master local[4] \

--class org.apache.hudi.utilities.HoodieClusteringJob \

/Users/liwei/work-space/dla/opensource/incubator-hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.8.0-SNAPSHOT.jar \

--base-path /Users/liwei/work-space/spark/spark-2.4.6-bin-hadoop2.7/hudi_table_with_small_filegroups3/dest \

--instant-time 20210122190240 \

--table-name hudi_table_with_small_filegroups_clustering \

--props /Users/liwei/work-space/spark/spark-2.4.6-bin-hadoop2.7/hudi_table_with_small_filegroups3/config/clusteringjob.properties \

--spark-memory 1g

...

Some caveats

There is WIP to fix these limitations. But these issues are worth mentioning:

...