Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We plan to use HFile(link1, link2) for storage purposes to leverage the random reads capability. You can find details about HFile benchmarking here. Short summary is, with a HFile containing 1M entries, look up of 100k entries take ~600 ms at p95. If we can achieve the same in a live system, it would be great for Hudi. 

Index Write path(to be used in Hoodie Write path)

On the ingest path, once the HoodieRecordLocation is obtained for all new (to be inserted) entries, these records are mapped into (RecordKey, <PartitionPath, FileId>). These are hashed based on recordKeys and mapped to a Bucket. The bucket mapping of the records can never change and hence the hashing function for any given dataset. Each bucket consists of N no of Hfiles. A point to remember about HFile is that, it does not support appends. Also, all writes to a single Hfile has to be sorted. So, every new batch will end up creating a new Hfile in the respective bucket. Hence every bucket will contain N no of HFiles. Definitely we need compaction to play a part to keep the number of HFiles at check. Also, do remember that there are no updates to values in this index since a record's location in Hoodie can never be changed. This characterization will help us in parallelisation on read path. 

...

Few things thats need attention during implementation: Data write and index write should be coupled and should be ACID compliant, i.e. either both are committed or both are rolledback. No partial data is seen by any readers midway. 

Index Read path (to be used in Hoodie read/update path)

On both read and update path, each record's location has to be known before proceeding to read or write. So, getRecordLocations(JavaRDD<RecordKeys>) will be called with the record keys from the input batch. These records will be hashed into their respective buckets and each bucket inturn will do the HFile look ups.

Parallelism: As quoted earlier, since there are on updates to entries in index, a single record can only be present in one Hfile within a bucket. So, ideally every Hfile could be parallely looked up. For instance, if we have 100 buckets with 10 HFiles in each, we could set parallelism to 1000 to get the maximum benefit

Index Delete(to be used in Hoodie write path)

We can do a typical tombstone, by adding an entry to the bucket with null value. So, during index lookup, we could still maintain the same parallelism to look up in all HFiles parallely within a given bucket, but choose the latest value if multiple values are returned. (What I meant here is that, if HFile1 returns FileId1 for record1, and HFile3 returns null for record1, we would take HFile3's value and hence record1 considered to be deleted).  For the commit timestamp, we should rely on either filename or commit metadata instead of embedding commit timestamp to the value of every entry which would definitely bloat the size of index in general. 

Adding delete support might complicate the compaction a bit. Due to this, compaction might have to read entire content of all HFiles that needs to be compacted, to find the resolved values for all entries before writing to the new base HFile. Hopefully this might not add too much overhead. 

Note: One point to remember is that, we should also support re-insertion of deleted entries. Above mentioned scheme would work for this use-case as well w/o any modifications

Hashing

As a default implementation, we could just hash the record key as is(java hash). But we should also add support for developers to define their own hashing function. 

...