Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

A good example is the Last Writer Wins (LWW) methodology with timestamp based resolution used to achieve eventual consistency. One of the problems with timestamp based resolution is that all servers need to be clock synchronized using NTP and there can be no scope for any clock drift. Ensuring this is an extremely hard job and requires special systems. Note that LWW methodology assumes that the timestamp of the write is associated to the lowest granularity of the datum, in databases, this is generally a column of a row.


Since in these methodologies, we are deferring this merging to the reader (realtime view), all of the following documentation assumes the hoodie table type to be MergeOnRead tables.


CRDTs

Hudi supports partial row updates, which means 2 different writes/updates could be updating 2 different columns in the same row. Since input data to Hudi are generally coming from a different source such as Kafka which may receive out of order writes, Hudi by itself cannot assign ordered timestamps to these updates. Hence, LWW based on timestamp does not directly work here. 

...

  1. Cleaning - No problem since only requires the state of the table in the past
  2. Compaction
    1. Scheduling - Phantom base file is created based on the scheduled compaction instant time. If multiple writers start at the same time as scheduling, inserts and updates get written to older log files which may not get picked up by the compaction process, thus resulting in data loss.
    2. Execution - No problem if the scheduling is taken care of
  3. Clustering
    1. Execution - In its current state, clustering cannot execute in parallel to concurrent updates. If a clustering is scheduled on some file groups and updates come along for those file groups, the writes will fail until clustering finishes. When implemented, this will NOT be a problem just like compaction.
    2. Scheduling - The scheduling of clustering while concurrent writes are happening can present some race conditions leading to data loss.
  4. Restore
    1. Scheduling - N/A
    2. Execution - With out of order updates being applied, we cannot know for sure what is the correct point in time to restore the data to without compromising the "correctness" of the table. Hence, here, we leave restore as an advanced option to users who can either a) Understand the trade-off of doing restore may not result in consistent, correct data b) Manage different update processes themselves so they know what is a good point in time to restore to without compromising correctness. 


To solve the issue of scheduling table services, we have the following optionsTo solve the issue of scheduling table services, we have the following options


Option 1 : Brief, table level locks for all writers and table services - Writers take a table level lock for the duration of creating the .commit file under the hoodie folder. Note, this is NOT optimistic concurrency. The lock is just meant to "seal" the state of the table when committing data to it. Similarly, scheduling a table service takes a lock to scan and create the plan. 

...

We should be able to get the past state of the table up to the timeline without any inflights. Basically, we want to avoid any on-going write to a log file, so we want to get the latest completed state of the table. Now, at this point, we will know which base_file + log files to compact. The problem is how to ensure the log files with the base commit as the previous compaction commit for which future writes are happening get picked up by the next compaction. Here, we simply add some logic to the HoodieTableFileSystemView during building of the FileSlices, in the collection of all log files, when we slice it into groups belong for Cn compaction commit and ones belonging to Cn-1 compaction commit, we can order the times based on commit timestamp and simply group them together. This way, the on-going commits to the log files with the base commit as the previous compaction commit timestamp will be "fast forwarded" to the FileSlice with the base file with the new compaction commit. 


Versioning (Cleaning)


With log based model and concurrent updates happening on the table at different instants of time, how many and which versions to keep becomes a challenge. This is work in progress.We cannot remove or clean older versions of data until we know it is "safe" to do so. This safety depends on whether the latest, correct update version has been recorded in the log file. 

Additionally, Hudi already only supports "cleaning" for MergeOnRead tables at FileSlice levels, any new updates going to log files cannot be cleaned individually, but only at the file slice level. With CRDTs, same will apply.


Incremental Pulls


Incremental pull allows to pull changes happening on a dataset. Without multiple writers, there is a monotonically increasing timestamp against which incremental pull can extract the changes since the last timestamp. With multiple writers, as pointed above in optimistic concurrency implementation, this is violated. Another important thing to note is that incremental pull currently provides the latest snapshot of a record by merging all intermediate changes that happened from time t1 to t2. 

For CRDTs based concurrency management, with updates applied out of order, we want to generate the incremental pull snapshot by applying the same rules of CRDT so that no updates are lost for downstream consumers and they always see consistent state of the data.

This would mean that Spark, Presto and Hive MOR incremental pull have to implement the same semantics of ordering updates like in a RealtimeView and in compaction. 


Overhead of MergeOnRead with CRDTs

COW vs MOR


Image Added


Let's dive deeper into the merge cost of MergeOnRead tables. 


Current CostsIncremental Pulls