You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »

 RFC - 08 : Record level indexing mechanisms for Hudi datasets (Partition agnostic, similar to existing HbaseIndex)

Proposers

Approvers

Status

Current state: "IN PROGRESS

Discussion thread: here

JIRA: here

Released: <Hudi Version>

Abstract

Hudi tables allow many operations to be performed on it, along with a very popular one, upsert(). To be able to support upserts, Hudi depends on an indexing scheme to route the incoming record to the correct file. 

Currently, Hudi supports partitioned and non-partitioned datasets. A partitioned dataset is one which bucketizes groups of files (data) into buckets called partitions. A hudi dataset may be composed of N number of partitions with M number of files. This structure helps canonical hive/presto/spark queries to limit the amount of data read by using the partition as a filter. The value of the partition/bucket in most cases is derived from the incoming data itself. The requirement is that once a record is mapped to a partition/bucket, this mapping should be a) known to hudi b) should remain constant for the lifecycle of the dataset for hudi to perform upserts on them. 

Consequently, in a non-partitioned dataset one can think of this problem as a record key <-> file id mapping that is required for hudi to be able to perform upserts on a record.  

Current solution is either a) for the client/user to provide the correct partition value as part of the payload or b) use a GlobalBloomIndex implementation to scan all the files under a given path (say non-partitioned table). In both these cases, we are limited either by the capability of the user to provide this information or by the performance overhead of scanning all files' bloom index. 

The proposal is to implement a new index format that is a mapping of (recordKey <-> partition, fileId). This mapping will be stored and maintained by Hudi as another implementation of HoodieIndex and will address the 2 limitations mentioned above.

Background

Types of datasets

HUDI storage abstraction is composed of 2 main components : 1) The actual data stored 2) An index that helps in looking up the location (file_Id) of a particular record key. Without this information, HUDI cannot perform upserts to datasets. We can broadly classify all datasets ingested in the data lake into 2 categories.

Append Only

An append only dataset is one where every new entry written to that table is mutually disjoint from any other entry written previously to that table. More concretely, each row in the table is a new row and has no overlap with a previously written row. You can imagine a table that is ingesting logs from an application, each new log is a new entry into the table, with little or no relation to the log entries written before it. A new entry into such a dataset hence does not require ANY context from the previous entries in the dataset to determine where this entry should be written to. 

Append + Update

An append + update, a.k.a, upsert type of dataset presents a different challenge. Any entry written to such a dataset may depend or belong to a previously written row. More concretely, each row in the table is a NOT necessarily a new row and may have overlap with a previously written row. In such a scenario, a system needs to be able to determine which row should this new update to written to, hence requiring the need to find out which file-id, update should be routed to.e

The 3 approaches uses by consumers of HUDI are as follows : 

  1. The data layout is divided in buckets called partitions and each partition consists of N number of files. The client maintains a mapping of record key <-> file id for updatable tables and provides the partition information before passing off the records to Hudi for processing. The HoodieBloomIndex implementation scans all the BloomIndexes of all files under a partition, and if matches, verifies the record key lies in that file. This way hoodie is table to “tag” the incoming records with the correct file ids.
  2. The data layout is a flat structure, where 1 single directory contains all the files for the updatable table. The GlobalHoodieBloomIndex implementation scans all the BloomIndexes of all files, and if matches, verifies the record key lies in that file. This way hoodie is table to “tag” the incoming records with the correct file ids. The difference between the first and second is that if the number of files is very large under this base path, the indexing time can blow up.
  3. For append only datasets which do not require updating, simply use the partition from the current payload, for eg. the current timestamp.

With a large amount of data being written to data warehouses, there is a need to better partition this data to be able to improve query performance. Hive is a good tool for performing queries on large datasets, especially datasets that require full table scans. But quite often there are instances where users need to filter the data on specific column values. Partitions help in efficient querying of the dataset since it limits the amount of data that needs to be queried. In non-partitioned tables, Hive/Presto/Spark would have to read all the files in a table’s data directory and subsequently apply filters on it. This is slow and expensive—especially in cases of large tables. 

Implementation


                                                                        

Hash based indexing 

Index entries are hashed into buckets and each bucket will hold a mapping of recordKey to <PartitionPath, FileId>. Total number of buckets has to be pre-determined for the dataset and can't be updated later. But each bucket can scale to more than one file groups based on the load. More on file groups later in the section.  With 1000 buckets and 1M entries per bucket, the index is capable of serving 1 Billion entries. So, more file groups per bucket is required only if the data set grows beyond 1 B unique entries. 

Each Bucket will expose two apis, namely getRecordLocations(JavaRDD<RecordKeys>) and insertRecords(JavaPairRDD<RecordKey, Tuple2<PatitionPath, FileId>>). 

Storage

We plan to use HFile(link1, link2) for storage purposes to leverage the random reads capability. You can find details about HFile benchmarking here. Short summary is, with a HFile containing 1M entries, look up of 100k entries take ~600 ms at p95. If we can achieve the same in a live system, it would be great for Hudi. 

Write path

On the ingest path, once the HoodieRecordLocation is obtained for all new (to be inserted) entries, these records are mapped into (RecordKey, <PartitionPath, FileId>). These are hashed based on recordKeys and mapped to a Bucket. The bucket mapping of the records can never change and hence the hashing function for any given dataset. Each bucket consists of N no of Hfiles. A point to remember about HFile is that, it does not support appends. Also, all writes to a single Hfile has to be sorted. So, every new batch will end up creating a new Hfile in the respective bucket. Hence every bucket will contain N no of HFiles. Definitely we need compaction to play a part to keep the number of HFiles at check. Also, do remember that there are no updates to values in this index since a record's location in Hoodie can never be changed. This characterization will help us in parallelisation on read path. 

Parallelism: Ideal parallelism for write is the total number of partitions, since each batch of ingest can create one HFile per bucket at max.   

Few things thats need attention during implementation: Data write and index write should be coupled and should be ACID compliant, i.e. either both are committed or both are rolledback. No partial data is seen by any readers midway. 

Read path (read/update)

On both read and update path, each record's location has to be known before proceeding to read or write. So, getRecordLocations(JavaRDD<RecordKeys>) will be called with the record keys from the input batch. These records will be hashed into their respective buckets and each bucket inturn will do the HFile look ups.

Parallelism: As quoted earlier, since there are on updates to entries in index, a single record can only be present in one Hfile within a bucket. So, ideally every Hfile could be parallely looked up. For instance, if we have 100 buckets with 10 HFiles in each, we could set parallelism to 1000 to get the maximum benefit. 

Hashing

As a default implementation, we could just hash the record key as is(java hash). But we should also add support for developers to define their own hashing function. 

HFile scan vs seek 

From the microbenchmarking, it is shown that for an HFile containing 1M entries, random seeks performs better until 300 to 400k look ups. Beyond that, full file scan (i.e. read entire HFile into in memory hash table and do a look up in the hash table) performs better. So, we could leverage that during our look up. We could store information about total number of entries in each Hfile. During look up, if no of entries to be looked up is < 30%, we could do random look up, if not, we could resort to full table scan. This needs to be flushed out with more finer details though. But something to keep in mind.  

Scaling further

Lets say with initial estimates, 1000 buckets were pre-allocated. And at some point, if we are reaching the tipping point i.e 1M entires per bucket, we can cap that file group and start a new file group. Remember with compaction in place, all HFiles per file group will be compacted to one HFile. So, if we don't expand to another file group, at some point we could have 2M entries or greater in one HFile which might give us bad performance. So, we ought to cap one file group to say 1M entries and start another file group. So, what this essentially means is that, this bucket of interest is equivalent to two virtual buckets. So, the hashing and number of buckets still remains the same, but our index is capable to scaling with more number of entries. 

Rollout/Adoption Plan

  • Existing Users will have to perform a 1 time migration of existing index (bloom, user driven partition path etc) to this index. We will look into write a tool for this.
  • This new index and the old index’s contracts will not change and that should be seamless to the client. 
  • We will need a migration tool to bootstrap the global index from the existing dataset.

Test Plan

<Describe in few sentences how the RFC will be tested. How will we know that the implementation works as expected? How will we know nothing broke?>

  • No labels