...
Info | ||
---|---|---|
| ||
import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ val tableName = "hudi_trips_cow" val basePath = "/tmp/hudi_trips_cow" val dataGen = new DataGenerator(Array("2020/03/11")) val updates = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(updates, 1)); df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). option("hoodie.parquet.small.file.limit", "0"). option("hoodie.clustering.inline", "true"). option("hoodie.clustering.inline.max.commits", "4"). option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824"). option("hoodie.clustering.plan.strategy.small.file.limit", "629145600"). option("hoodie.clustering.plan.strategy.sort.columns", ""). //optional, if sorting is needed as part of rewriting data mode(Append). save(basePath); |
Setup for Async clustering Job
Clustering can be scheduled and run asynchronously using WriteClient APIs
...
a SparkJob. The utilities spark job can be found here
...
- prepare the clusering config file
2. Schedule clustering
Info | ||
---|---|---|
| ||
bin/spark-submit \ --master local[4] \ --class org.apache.hudi.utilities.HoodieClusteringJob \ /Users/liwei/work-space/dla/opensource/incubator-hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.8.0-SNAPSHOT.jar \ --schedule \ --base-path /Users/liwei/work-space/spark/spark-2.4.6-bin-hadoop2.7/hudi_table_with_small_filegroups3/dest \ --table-name hudi_table_with_small_filegroups3_schedule_clustering \ --props /Users/liwei/work-space/spark/spark-2.4.6-bin-hadoop2.7/hudi_table_with_small_filegroups3/config/clusteringjob.properties \ --spark-memory 1g |
you can find the schedule clustering instant time in the spark logs. With the log prefix "The schedule instant time is" ,and the schedule clustering instant time is 20210122190240
- Execute clustering API can be found here
Info | ||
---|---|---|
| ||
bin/spark-submit \ --master local[4] \ --class org.apache.hudi.utilities.HoodieClusteringJob \ /Users/liwei/work-space/dla/opensource/incubator-hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.8.0-SNAPSHOT.jar \ --schedule \ --base-path /Users/liwei/work-space/spark/spark-2.4.6-bin-hadoop2.7/hudi_table_with_small_filegroups3/dest \ --table-name hudi_table_with_small_filegroups3_schedule_clustering \ --props /Users/liwei/work-space/spark/spark-2.4.6-bin-hadoop2.7/hudi_table_with_small_filegroups3/config/clusteringjob.properties \ --spark-memory 1g |
Some caveats
There is WIP to fix these limitations. But these issues are worth mentioning:
...