Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents
outlinetrue
absoluteUrltrue
stylecircle


err


Alertinfo
titleContributingPage no longer maintained
typeError
this page has moved to https://hudi.apache.org/learn/faq To contribute content to this FAQ, see here.

General 

When is Hudi a useful for me or my organization 

...

When issuing an `upsert` operation on a dataset and the batch of records provided contains multiple entries for a given key, then all of them are reduced into a single final value by repeatedly calling payload class's preCombine() method . By default, we pick the record with the greatest value (determined by calling .compareTo()) giving latest-write-wins style semanticsThis FAQ entry shows the interface for HoodieRecordPayload if you are interested. 

For an insert or bulk_insert operation, no such pre-combining is performed. Thus, if your input contains duplicates, the dataset would also contain duplicates. If you don't want duplicate records either issue an upsert or consider specifying option to de-duplicate input in either datasource or deltastreamer.

...

A key design decision in Hudi was to avoid creating small files and always write properly sized files.

There are 2 ways to avoid creating tons of small files in Hudi and both of them have different trade-offs:

1. Auto Size small files during ingestion: This solution trades ingest/writing time to keep queries always efficient. Common approaches to writing very small files and then later stitching them together only , trading off more time on ingest/writing to keep queries always efficient. Common approaches to writing very small files and then later stitching them together only solve for system scalability issues posed by small files and also let queries slow down by exposing small files to them anyway. 

Hudi has the ability to maintain a configured target file size, when performing upsert/insert operations. (Note: bulk_insert operation does not provide this functionality and is designed as a simpler replacement for normal `spark.write.parquet`  )

For copy-on-write, this is as simple as configuring the maximum size for a base/parquet file  and the soft limit below which a file should be considered a small file. For the initial bootstrap to Hudi table, tuning record size estimate is also important to ensure sufficient records are bin-packed in a parquet file. For subsequent writes, Hudi automatically uses average record size based on previous commit. Hudi will try to add enough records to a small file at write time to get it to the configured maximum limit. For e.g , with `compactionSmallFileSize=100MB` and limitFileSize=120MB, Hudi will pick all files < 100MB and try to get them upto 120MB. 

For merge-on-read, there are few more configs to set. Specifically. MergeOnRead works differently for different INDEX choices.

  1. Indexes with canIndexLogFiles = true : Inserts of new data go directly to log files. In this case, you can configure the maximum log size and a factor that denotes reduction in size when data moves from avro to parquet files. 

You can also use clustering, to group small files into larger ones.

Why does Hudi retain at-least one previous commit even after setting hoodie.cleaner.commits.retained': 1 ?

Hudi runs cleaner to remove old file versions as part of writing data either in inline or in asynchronous mode (0.6.0 onwards). Hudi Cleaner retains at-least one previous commit when cleaning old file versions. This is to prevent the case when concurrently running queries which are reading the latest file versions suddenly  see those files getting deleted by cleaner because a new file version got added . In other words, retaining at-least one previous commit is needed for ensuring snapshot isolation for readers.

How do I use DeltaStreamer or Spark DataSource API to write to a Non-partitioned Hudi dataset ?

Hudi supports writing to non-partitioned datasets. For writing to a non-partitioned Hudi dataset and performing hive table syncing, you need to set the below configurations in the properties passed:

Code Block
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor

Why do we have to set 2 different ways of configuring Spark to work with Hudi?

Non-Hive engines tend to do their own listing of DFS to query datasets. For e.g Spark starts reading the paths direct from the file system (HDFS or S3).

From Spark the calls would be as below: 
- org.apache.spark.rdd.NewHadoopRDD.getPartitions 
- org.apache.parquet.hadoop.ParquetInputFormat.getSplits 
- org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits 

Without understanding of Hudi's file layout, engines would just plainly reading all the parquet files and displaying the data within them, with massive amounts of duplicates in the result.

At a high level, there are two ways of configuring a query engine to properly read Hudi datasets

A) Making them invoke methods in `HoodieParquetInputFormat#getSplits` and `HoodieParquetInputFormat#getRecordReader` 

  • Hive does this natively, since the InputFormat is the abstraction in Hive to plugin new table formats. HoodieParquetInputFormat extends MapredParquetInputFormat which is nothing but a input format for hive and we register Hudi tables to Hive metastore backed by these input formats
  • Presto also falls back to calling the input format when it sees a `UseFileSplitsFromInputFormat` annotation, to just obtain splits, but then goes on to use its own optimized/vectorized parquet reader for queries on Copy-on-Write tables
  • Spark can be forced into falling back to the HoodieParquetInputFormat class, using --conf spark.sql.hive.convertMetastoreParquet=false
     `

B) Making the engine invoke a path filter or other means to directly call Hudi classes to filter the files on DFS and pick out the latest file slice

  • Even though we can force Spark to fallback to using the InputFormat class, we could lose ability to use Spark's optimized parquet reader path by doing so. 
  • To keep benefits of native parquet read performance, we set the  `HoodieROTablePathFilter` as a path filter, explicitly set this in the Spark Hadoop Configuration.There is logic in the file: to ensure that folders (paths) or files for Hoodie related files always ensures that latest file slice is selected. This filters out duplicate entries and shows latest entries for each record. 

I have an existing dataset and want to evaluate Hudi using portion of that data ?

You can bulk import portion of that data to a new hudi table.  For example, if you want to try on  a month of data -

Code Block
spark.read.parquet("your_data_set/path/to/month")
     .write.format("org.apache.hudi")
     .option("hoodie.datasource.write.operation", "bulk_insert")
     .option("hoodie.datasource.write.storage.type", "storage_type") // COPY_ON_WRITE or MERGE_ON_READ
     .option(RECORDKEY_FIELD_OPT_KEY, "<your key>").
     .option(PARTITIONPATH_FIELD_OPT_KEY, "<your_partition>")
     ...
     .mode(SaveMode.Append)
     .save(basePath);

Once you have the initial copy, you can simply run upsert operations on this by selecting some sample of data every round

Code Block
spark.read.parquet("your_data_set/path/to/month").limit(n) // Limit n records 
     .write.format("org.apache.hudi")
     .option("hoodie.datasource.write.operation", "upsert")
     .option(RECORDKEY_FIELD_OPT_KEY, "<your key>").
     .option(PARTITIONPATH_FIELD_OPT_KEY, "<your_partition>")
     ...
     .mode(SaveMode.Append)
     .save(basePath);

For merge on read table, you may want to also try scheduling and running compaction jobs. You can run compaction directly using spark submit on org.apache.hudi.utilities.HoodieCompactor or by using HUDI CLI

If I keep my file versions at 1, with this configuration will i be able to do a roll back (to the last commit) when write fail?

Yes, Commits happen before cleaning. Any failed commits will not cause any side-effects and Hudi will guarantee snapshot isolation.

Does AWS GLUE  support Hudi ?

AWS Glue jobs can write, read and update Glue Data Catalog for hudi tables. In order to successfully integrate with Glue Data Catalog, you need to subscribe to one of the AWS provided Glue connectors named "AWS Glue Connector for Apache Hudi". Glue job needs to have "Use Glue data catalog as the Hive metastore" option ticked. Detailed steps with a sample scripts is available on this article provided by AWS - https://aws.amazon.com/blogs/big-data/writing-to-apache-hudi-tables-using-aws-glue-connector/.

In case if your using either notebooks or Zeppelin through Glue dev-endpoints, your script might not be able to integrate with Glue DataCatalog when writing to hudi tables.

Why partition fields are also stored in parquet files in addition to the partition path ?

Hudi supports customizable partition values which could be a derived value of another field. Also, storing the partition value only as part of the field results in losing type information when queried by various query engines.

I am seeing lot of archive files. How do I control the number of archive commit files generated?

Please note that in cloud stores that do not support log append operations, Hudi is forced to create new archive files to archive old metadata operations.  You can increase hoodie.commits.archival.batch moving forward to increase the number of commits archived per archive file. In addition, you can increase the difference between the 2 watermark configurations : hoodie.keep.max.commits (default : 30) and hoodie.keep.min.commits (default : 20). This way, you can reduce the number of archive files created and also at the same time increase the number of metadata archived per archive file. Note that post 0.7.0 release where we are adding consolidated Hudi metadata (RFC-15), the follow up work would involve re-organizing archival metadata so that we can do periodic compactions to control file-sizing of these archive files.

How do I configure Bloom filter (when Bloom/Global_Bloom index is used)? 

Bloom filters are used in bloom indexes to look up the location of record keys in write path. Bloom filters are used only when the index type is chosen as “BLOOM” or “GLOBAL_BLOOM”. Hudi has few config knobs that users can use to tune their bloom filters. 

On a high level, hudi has two types of blooms: Simple and Dynamic.

Simple, as the name suggests, is simple. Size is statically allocated based on few configs. 

“hoodie.bloom.index.filter.type”: SIMPLE

“hoodie.index.bloom.num_entries” refers to the total number of entries per bloom filter, which refers to one file slice. Default value is 60000.

“hoodie.index.bloom.fpp” refers to the false positive probability with the bloom filter. Default value: 1*10^-9.

Size of the bloom filter depends on these two values. This is statically allocated and here is the formula that determines the size of bloom. Until the total number of entries added to the bloom is within the configured “hoodie.index.bloom.num_entries” value, the fpp will be honored. i.e. with default values of 60k and 1*10^-9, bloom filter serialized size = 430kb. But if more entries are added, then the false positive probability will not be honored. Chances that more false positives could be returned if you add more number of entries than the configured value. So, users are expected to set the right values for both num_entries and fpp. 

Hudi suggests to have roughly 100 to 120 mb sized files for better query performance. So, based on the record size, one could determine how many records could fit into one data file. 

Lets say your data file max size is 128Mb and default avg record size is 1024 bytes. Hence, roughly this translates to 130k entries per data file. For this config, you should set num_entries to ~130k. 

Dynamic bloom filter:

“hoodie.bloom.index.filter.type” : DYNAMIC

  1. Indexes with canIndexLogFiles = false : Inserts of new data go only to parquet files. In this case, the same configurations as above for the COPY_ON_WRITE case applies. 

NOTE : In either case, small files will be auto sized only if there is no PENDING compaction or associated log file for that particular file slice. For example, for case 1: If you had a log file and a compaction C1 was scheduled to convert that log file to parquet, no more inserts can go into that log file. For case 2: If you had a parquet file and an update ended up creating an associated delta log file, no more inserts can go into that parquet file. Only after the compaction has been performed and there are NO log files associated with the base parquet file, can new inserts be sent to auto size that parquet file.

2. Clustering : This is a feature in Hudi to group small files into larger ones either synchronously or asynchronously. Since first solution of auto-sizing small files has a tradeoff on ingestion speed (since the small files are sized during ingestion), if your use-case is very sensitive to ingestion latency where you don't want to compromise on ingestion speed which may end up creating a lot of small files, clustering comes to the rescue. Clustering can be scheduled through the ingestion job and an asynchronus job can stitch small files together in the background to generate larger files. NOTE that during this, ingestion can continue to run concurrently. 

Please note that Hudi always creates immutable files on disk. To be able to do auto-sizing or clustering, Hudi will always create a newer version of the smaller file, resulting in 2 versions of the same file. The cleaner service will later kick in and delte the older version small file and keep the latest one. 

Why does Hudi retain at-least one previous commit even after setting hoodie.cleaner.commits.retained': 1 ?

Hudi runs cleaner to remove old file versions as part of writing data either in inline or in asynchronous mode (0.6.0 onwards). Hudi Cleaner retains at-least one previous commit when cleaning old file versions. This is to prevent the case when concurrently running queries which are reading the latest file versions suddenly  see those files getting deleted by cleaner because a new file version got added . In other words, retaining at-least one previous commit is needed for ensuring snapshot isolation for readers.

How do I use DeltaStreamer or Spark DataSource API to write to a Non-partitioned Hudi dataset ?

Hudi supports writing to non-partitioned datasets. For writing to a non-partitioned Hudi dataset and performing hive table syncing, you need to set the below configurations in the properties passed:

Code Block
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor

Why do we have to set 2 different ways of configuring Spark to work with Hudi?

Non-Hive engines tend to do their own listing of DFS to query datasets. For e.g Spark starts reading the paths direct from the file system (HDFS or S3).

From Spark the calls would be as below: 
- org.apache.spark.rdd.NewHadoopRDD.getPartitions 
- org.apache.parquet.hadoop.ParquetInputFormat.getSplits 
- org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits 

Without understanding of Hudi's file layout, engines would just plainly reading all the parquet files and displaying the data within them, with massive amounts of duplicates in the result.

At a high level, there are two ways of configuring a query engine to properly read Hudi datasets

A) Making them invoke methods in `HoodieParquetInputFormat#getSplits` and `HoodieParquetInputFormat#getRecordReader` 

  • Hive does this natively, since the InputFormat is the abstraction in Hive to plugin new table formats. HoodieParquetInputFormat extends MapredParquetInputFormat which is nothing but a input format for hive and we register Hudi tables to Hive metastore backed by these input formats
  • Presto also falls back to calling the input format when it sees a `UseFileSplitsFromInputFormat` annotation, to just obtain splits, but then goes on to use its own optimized/vectorized parquet reader for queries on Copy-on-Write tables
  • Spark can be forced into falling back to the HoodieParquetInputFormat class, using --conf spark.sql.hive.convertMetastoreParquet=false
     `

B) Making the engine invoke a path filter or other means to directly call Hudi classes to filter the files on DFS and pick out the latest file slice

  • Even though we can force Spark to fallback to using the InputFormat class, we could lose ability to use Spark's optimized parquet reader path by doing so. 
  • To keep benefits of native parquet read performance, we set the  `HoodieROTablePathFilter` as a path filter, explicitly set this in the Spark Hadoop Configuration.There is logic in the file: to ensure that folders (paths) or files for Hoodie related files always ensures that latest file slice is selected. This filters out duplicate entries and shows latest entries for each record. 

I have an existing dataset and want to evaluate Hudi using portion of that data ?

You can bulk import portion of that data to a new hudi table.  For example, if you want to try on  a month of data -

Code Block
spark.read.parquet("your_data_set/path/to/month")
     .write.format("org.apache.hudi")
     .option("hoodie.datasource.write.operation", "bulk_insert")
     .option("hoodie.datasource.write.storage.type", "storage_type") // COPY_ON_WRITE or MERGE_ON_READ
     .option(RECORDKEY_FIELD_OPT_KEY, "<your key>").
     .option(PARTITIONPATH_FIELD_OPT_KEY, "<your_partition>")
     ...
     .mode(SaveMode.Append)
     .save(basePath);


Once you have the initial copy, you can simply run upsert operations on this by selecting some sample of data every round


Code Block
spark.read.parquet("your_data_set/path/to/month").limit(n) // Limit n records 
     .write.format("org.apache.hudi")
     .option("hoodie.datasource.write.operation", "upsert")
     .option(RECORDKEY_FIELD_OPT_KEY, "<your key>").
     .option(PARTITIONPATH_FIELD_OPT_KEY, "<your_partition>")
     ...
     .mode(SaveMode.Append)
     .save(basePath);


For merge on read table, you may want to also try scheduling and running compaction jobs. You can run compaction directly using spark submit on org.apache.hudi.utilities.HoodieCompactor or by using HUDI CLI

If I keep my file versions at 1, with this configuration will i be able to do a roll back (to the last commit) when write fail?

Yes, Commits happen before cleaning. Any failed commits will not cause any side-effects and Hudi will guarantee snapshot isolation.

Does AWS GLUE  support Hudi ?

AWS Glue jobs can write, read and update Glue Data Catalog for hudi tables. In order to successfully integrate with Glue Data Catalog, you need to subscribe to one of the AWS provided Glue connectors named "AWS Glue Connector for Apache Hudi". Glue job needs to have "Use Glue data catalog as the Hive metastore" option ticked. Detailed steps with a sample scripts is available on this article provided by AWS - https://aws.amazon.com/blogs/big-data/writing-to-apache-hudi-tables-using-aws-glue-connector/.

In case if your using either notebooks or Zeppelin through Glue dev-endpoints, your script might not be able to integrate with Glue DataCatalog when writing to hudi tables.

Why partition fields are also stored in parquet files in addition to the partition path ?

Hudi supports customizable partition values which could be a derived value of another field. Also, storing the partition value only as part of the field results in losing type information when queried by various query engines.

I am seeing lot of archive files. How do I control the number of archive commit files generated?

Please note that in cloud stores that do not support log append operations, Hudi is forced to create new archive files to archive old metadata operations.  You can increase hoodie.commits.archival.batch moving forward to increase the number of commits archived per archive file. In addition, you can increase the difference between the 2 watermark configurations : hoodie.keep.max.commits (default : 30) and hoodie.keep.min.commits (default : 20). This way, you can reduce the number of archive files created and also at the same time increase the number of metadata archived per archive file. Note that post 0.7.0 release where we are adding consolidated Hudi metadata (RFC-15), the follow up work would involve re-organizing archival metadata so that we can do periodic compactions to control file-sizing of these archive files.

How do I configure Bloom filter (when Bloom/Global_Bloom index is used)? 

Bloom filters are used in bloom indexes to look up the location of record keys in write path. Bloom filters are used only when the index type is chosen as “BLOOM” or “GLOBAL_BLOOM”. Hudi has few config knobs that users can use to tune their bloom filters. 

On a high level, hudi has two types of blooms: Simple and Dynamic.

Simple, as the name suggests, is simple. Size is statically allocated based on few configs. 

“hoodie.bloom.index.filter.type”: SIMPLE

“hoodie.index.bloom.num_entries” refers to the total number of entries per bloom filter, which refers to one file slice. Default value is 60000.

“hoodie.index.bloom.fpp” refers to the false positive probability with the bloom filter. Default value: 1*10^-9.

Size of the bloom filter depends on these two values. This is statically allocated and here is the formula that determines the size of bloom. Until the total number of entries added to the bloom is within the configured “hoodie.index.bloom.num_entries” value, the fpp will be honored. i.e. with default values of 60k and 1*10^-9, bloom filter serialized size = 430kb. But if more entries are added, then the false positive probability will not be honored. Chances that more false positives could be returned if you add more number of entries than the configured value. So, users are expected to set the right values for both num_entries and fpp. 

Hudi suggests to have roughly 100 to 120 mb sized files for better query performance. So, based on the record size, one could determine how many records could fit into one data file. 

Lets say your data file max size is 128Mb and default avg record size is 1024 bytes. Hence, roughly this translates to 130k entries per data file. For this config, you should set num_entries to ~130k. 

Dynamic bloom filter:

“hoodie.bloom.index.filter.type” : DYNAMIC

This is an advanced version of the bloom filter which grows dynamically as the number of entries grows. So, users are expected to set two values wrt num_entires. “hoodie.index.bloom.num_entries” will determine the starting size of the bloom. “hoodie.bloom.index.filter.dynamic.max.entries” will determine the max size to which the bloom can grow upto. And fpp needs to be set similar to “Simple” bloom filter. Bloom size will be allotted based on the first config “hoodie.index.bloom.num_entries”. Once the number of entries reaches this value, bloom will dynamically grow its size to 2X. This will go on until the size reaches a max of “hoodie.bloom.index.filter.dynamic.max.entries” value. Until the size reaches this max value, fpp will be honored. If the entries added exceeds the max value, then the fpp may not be honored. 

How to tune shuffle parallelism of Hudi jobs ?

First, let's understand what the term parallelism means in the context of Hudi jobs. For any Hudi job using Spark, parallelism equals to the number of spark partitions that should be generated for a particular stage in the DAG. To understand more about spark partitions, read this article. In spark, each spark partition is mapped to a spark task that can be executed on an executor. Typically, for a spark application the following hierarchy holds true

(Spark Application → N Spark Jobs → M Spark Stages → T Spark Tasks) on (E executors with C cores)

A spark application can be given E number of executors to run the spark application on. Each executor might hold 1 or more spark cores. Every spark task will require atleast 1 core to execute, so imagine T number of tasks to be done in Z time depending on C cores. The higher C, Z is smaller. 

With this understanding, if you want your DAG stage to run faster, bring T as close or higher to C. Additionally, this parallelism finally controls the number of output files you write using a Hudi based job. Let's understand the different kinds of knobs available:

BulkInsertParallelism → This is used to control the parallelism with which output files will be created by a Hudi job. The higher this parallelism, the more number of tasks are created and hence the more number of output files will eventually be created. Even if you define parquet-max-file-size to be of a high value, if you make parallelism really high, the max file size cannot be honored since the spark tasks are working on smaller amounts of data. 

Upsert/Insert Parallelism → This is used to control how fast the read process should be when reading data into the job. Find more details here.  

INT96, INT64 and timestamp compatibility

https://hudi.apache.org/docs/configurations.html#HIVE_SUPPORT_TIMESTAMPThis is an advanced version of the bloom filter which grows dynamically as the number of entries grows. So, users are expected to set two values wrt num_entires. “hoodie.index.bloom.num_entries” will determine the starting size of the bloom. “hoodie.bloom.index.filter.dynamic.max.entries” will determine the max size to which the bloom can grow upto. And fpp needs to be set similar to “Simple” bloom filter. Bloom size will be allotted based on the first config “hoodie.index.bloom.num_entries”. Once the number of entries reaches this value, bloom will dynamically grow its size to 2X. This will go on until the size reaches a max of “hoodie.bloom.index.filter.dynamic.max.entries” value. Until the size reaches this max value, fpp will be honored. If the entries added exceeds the max value, then the fpp may not be honored. 

Contributing to FAQ 

A good and usable FAQ should be community-driven and crowd source questions/thoughts across everyone. 

...