Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

HUDI storage abstraction is composed of 2 main components : 1) The actual data stored 2) An index that helps in looking up the location (file_Id) of a particular record key. Without this information, HUDI cannot perform upserts to datasets. We can broadly classify all datasets ingested in the data lake into 2 categories.

Append Only

Insert/Event data

Insert or event data An append only dataset is one where every new entry written to that table is mutually disjoint from any other entry written previously to that table. More concretely, each row in the table is a new row and has no overlap with a previously written row. You can imagine a table that is ingesting logs from an application, each new log is a new entry into the table, with little or no relation to the log entries written before it. A new entry into such a dataset hence does not require ANY context from the previous entries in the dataset to determine where this entry should be written to. 

Append + Update

Upsert/Change Log data

An upsert/change log An append + update, a.k.a, upsert type of dataset presents a different challenge. Any entry written to such a dataset may depend or belong to a previously written row. More concretely, each row in the table is a NOT necessarily a new row and may have overlap with a previously written row. In such a scenario, a system needs to be able to determine which row should this new update to written to, hence requiring the need to find out which file-id, update should be routed to.

...

On the ingest path, once the HoodieRecordLocation is obtained for all new (to be inserted) entries, these records are mapped into (RecordKey, <PartitionPath, FileId>). These are hashed based on recordKeys and mapped to a Bucket. The bucket mapping of the records can never change and hence the hashing function for any given dataset. Each bucket consists of N no of Hfiles. A point to remember about HFile is that, it does not support appends. Also, all writes to a single Hfile has to be sorted. So, every new batch will end up creating a new Hfile in the respective bucket. Hence every bucket will contain N no of HFiles. Definitely we need compaction to play a part to keep the number of HFiles at check. Also, do remember that there are no updates to values in this index since a record's location in Hoodie can never be changed. This characterization will help us in parallelisation on read path. 

...

Few things thats need attention during implementation: Data write and index write should be coupled and should be ACID compliant, i.e. either both are committed or both are rolledback. No partial data is seen by any readers midway. 

Index Read path (to be used in Hoodie read/update path)

On both read and update path, each record's location has to be known before proceeding to read or write. So, getRecordLocations(JavaRDD<RecordKeys>) will be called with the record keys from the input batch. These records will be hashed into their respective buckets and each bucket inturn will do the HFile look ups.

Parallelism: As quoted earlier, since there are on updates to entries in index, a single record can only be present in one Hfile within a bucket. So, ideally every Hfile could be parallely looked up. For instance, if we have 100 buckets with 10 HFiles in each, we could set parallelism to 1000 to get the maximum benefit. 

Index Delete(to be used in Hoodie write path)

We can do a typical tombstone, by adding an entry to the bucket with null value. So, during index lookup, we could still maintain the same parallelism to look up in all HFiles parallely within a given bucket, but choose the latest value if multiple values are returned. (For eg, if HFile1 returns FileId1 for record1, and HFile3 returns null for record1, we would take HFile3's value and hence record1 considered to be deleted). For the commit timestamp, we should rely on either filename or commit metadata instead of embedding commit timestamp to the value of every entry which would definitely bloat the size of index in general. 

Adding delete support might complicate the compaction a bit. Due to this, compaction might have to read entire content of all HFiles that needs to be compacted, to find the resolved values for all entries before writing to the new base HFile. Hopefully this might not add too much overhead. Also, compaction will ignore deleted entries to avoid unnecessary storage for delete entries. So, it may not possible to determine if a record was never inserted or was it deleted after insertion. Let me know if this is an issue with Hoodie. 

Note: One point to remember is that, we should also support re-insertion of deleted entries. Above mentioned scheme would work for this use-case as well w/o any modifications. 

Hashing

As a default implementation, we could just hash the record key as is(java hash). But we should also add support for developers to define their own hashing function. 

HFile scan vs seek 

Updates

May be as of today, Hoodie's record location is immutable, but we can't say that would hold true in future. So, this index should be capable of handling updates to mapping. In such cases, multiple values will be returned (For eg, if HFile1 returns FileId1 for record1, and HFile3 returns FileId5 for record1, we would take HFile3's value and hence record1's location is FileId5). For the commit timestamp, we should rely on either filename or commit metadata instead of embedding commit timestamp to the value of every entry which would definitely bloat the size of index in general. 

Index Read path (to be used in Hoodie read/update path)

On both read and update path, each record's location has to be known before proceeding to read or write. So, getRecordLocations(JavaRDD<RecordKeys>) will be called with the record keys from the input batch. These records will be hashed into their respective buckets and each bucket inturn will do the HFile look ups.

Parallelism: As quoted earlier, since there are on updates to entries in index, a single record can only be present in one Hfile within a bucket. So, ideally every Hfile could be parallely looked up. For instance, if we have 100 buckets with 10 HFiles in each, we could set parallelism to 1000 to get the maximum benefit. 

Index Delete(to be used in Hoodie write path)

We can do a typical tombstone, by adding an entry to the bucket with null value. So, during index lookup, we could still maintain the same parallelism to look up in all HFiles parallely within a given bucket, but choose the latest value if multiple values are returned. (For eg, if HFile1 returns FileId1 for record1, and HFile3 returns null for record1, we would take HFile3's value and hence record1 considered to be deleted). For the commit timestamp, we should rely on either filename or commit metadata instead of embedding commit timestamp to the value of every entry which would definitely bloat the size of index in general. 

Adding delete support might complicate the compaction a bit. Due to this, compaction might have to read entire content of all HFiles that needs to be compacted, to find the resolved values for all entries before writing to the new base HFile. Hopefully this might not add too much overhead. Also, compaction will ignore deleted entries to avoid unnecessary storage for delete entries. So, it may not possible to determine if a record was never inserted or was it deleted after insertion. Let me know if this is an issue with Hoodie. 

Note: One point to remember is that, we should also support re-insertion of deleted entries. Above mentioned scheme would work for this use-case as well w/o any modifications. 

Hashing

As a default implementation, we could just hash the record key as is(java hash). But we should also add support for developers to define their own hashing function. 

HFile scan vs seek 

From the microbenchmarking, it is shown that for an HFile containing 1M entries, random seeks performs better until 300 to 400k look ups. Beyond that, full file scan (i.e. read entire HFile into in memory hash table and do a look up in the hash table) performs better. So, we could leverage that during our look up. We could store information about total number of entries in each Hfile. During look up, if no of entries to be looked up is < 30%, we could do random look up, if not, we could resort to full table scan. This needs to be flushed out with more finer details though. But for now, we can introduce two configs, namely "record.level.index.dynamically.choose.scan.vs.seek" and "record.level.index.do.random.lookups". First config if set to true, dynamically scan or seek will be chosen. If set to false, for streaming use-cases, value from 2nd config will be considered. For streaming use-cases developer has the ability to choose scan or seek while reading. From the microbenchmarking, it is shown that for an HFile containing 1M entries, random seeks performs better until 300 to 400k look ups. Beyond that, full file scan (i.e. read entire HFile into in memory hash table and do a look up in the hash table) performs better. So, we could leverage that during our look up. We could store information about total number of entries in each Hfile. During look up, if no of entries to be looked up is < 30%, we could do random look up, if not, we could resort to full table scan. This needs to be flushed out with more finer details though. But something to keep in mind.  

Scaling further

This section is still getting flushed out. Listing the possible options for now. 

In general, its a good practice to over provision by 30% more than we anticipate to avoid scaling beyond initial number of buckets. Because, there is some trade off or overhead involved in trying to scale further beyond initial number of buckets initialized. First version of implementation may not address this and expects users to over provision

Option1: Adding file groups to existing buckets

...