Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Hudi currently supports a single writer model and uses MVCC for concurrently updating a table via tables services such as clustering, compaction, cleaning, thus allowing then to run asynchronously without blocking writers. Using MVCC, Hudi is able to provide Snapshot Isolation guarantees. Let's take a quick look at the different levels of isolations and their orders with respect to vulnerabilities such as dirty reads, non-repeatable reads and phantom reads. 


Image RemovedImage Added

With Snapshot Isolation, readers are able to get repeatable reads (get the same result if queried multiple times during the same write/transaction) since each reader works on a "snapshot" of the database, aka specific version of latest committed data seen during the start of that transaction. This is possible with MVCC keeping multiple versions of data. Although, this is only true when there are no write-write conflicts also known as write skews. During a concurrent update of data; since each writer is working with its own "snapshot/version" of the data, snapshot isolation cannot guarantee that no updates are lost if each of the writers were independently modifying the same datum (record or file). Snapshot Isolation hence is vulnerable to update conflicts unless a conflict resolution strategy is applied. Update conflicts are generally solved in different ways as follows (we will not discuss READ_UNCOMMITTED since that violates basic requirements of Hudi anyways) 

  1. READ_COMMITTED isolation level uses pessimistic locks on the data being changed. 
    1. Pros
      1. Every writer always reads the latest committed data
    2. Cons
      1. Starvation since writers contend for pessimistic locks based on when the lock is released
      2. Non-Repeatable reads (same record or file read multiple times during the same write/transaction) → this is lower level of guarantee than what Hudi already provides with snapshot isolation
      3. Requires locks at record or file level to allow concurrent transactions to complete in parallel
  2. SERIALIZABLE isolation uses different techniques of achieving serializability such as 2 phase locking (2PL), timestamp ordering for conflict avoidance. Such techniques are fairly complex and probably a non-scalable, overkill for a system like Hudi. The reasons for this is out of the scope of this RFC but can be understood by doing research on the need of 2PL and how it is implemented.
    1. Pros
      1. Highest form of isolation and provides many more guarantees than update conflicts (such as no phantom reads, conflict and view serializability etc)
    2. Cons
      1. Complex to implement and is generally something that is required for OLTP type systems 
      2. Out of scope of this RFC 
  3. SERIALIZABLE_SNAPSHOT_ISOLATION is a variant of serializable isolation without some of the other guarantees provided by serializable isolation level but still providing methodologies to handle update conflicts. This isolation level uses MVCC based snapshot isolation along with a way to resolve update conflicts. There could be multiple ways to achieve this. 
    1. PESSIMISTIC LOCKING
      1. Acquire a exclusive_lock (reads & writes are blocked) for the entirety of the transaction. For Hudi this could be table level, partition level or file level - each of them have different trade-offs. Additionally, for a system like Hudi, where a writer modifies a batch of data and has no prior understanding of what that batch is, this would mean table level locks for the entirety of duration for the writer. 
        1. Pros
          1. Achieves our goal of handling update conflicts
        2. Cons
          1. May not scale depending on level of locks chosen
          2. Can eventually make all transactions serial by using table level locks for the entirety of the transaction thus turning it into a serial operation anyways. 
    2. OPTIMISITIC LOCKING
      1. Allow two concurrent transactions to proceed by reading the latest committed snapshot at the beginning of their transaction. Before committing the transactions, acquire a table level exclusive_lock (reads & writes are blocked) and perform conflict resolution to check if there are conflicting updates. If yes, allow for first transaction (may be using timestamp ordering) to continue to commit and abort the second transaction (which has to be retried). 
        1. Pros
          1. Achieves our goal of handling update conflicts
          2. Scales well since only a table level lock is needed
          3. Lock is acquired only for a small duration of time so allows transactions to run and commit in parallel if no conflicts
          4. Works well for cases where there is none to light contentions
        2. Cons
          1. Requires an external server for locks 
          2. Non performant when there is heavy contention
    3. LOCK FREE
      1. Since MVCC keeps a version of every modified data, two concurrent transactions could be allowed to proceed and commit concurrently. With all versions of data present, a conflict resolution and reconciliation mechanism can be applied later by the reader to get the latest, correct state of the datum.  
        1. Pros
          1. Achieves our goal of handling update conflicts
          2. No locks are required
          3. Scales well
        2. Cons
          1. Reader has to do additional work of reconciliation which may require additional metadata thus a possibility of increasing disk and compute cost. 

...

Additionally, it can so happen that writer jobs can hang due to several reasons such as long GC pauses or other code bugs. In this situation, due to lack of heartbeats, a cleaner may end up considering this as a failed write and clean up data and metadata. Hence, we also need to implement similar checks on the writer before committing to ensure we abort such writers. 


Image RemovedImage Added

Ingestion Retry

...

Users cannot run bootstrap in parallel. They need to finish the bootstrap of a non-hudi table into Hudi first, and then start single or multi writers. Rollback of failed bootstrap’s are also inline at the moment. To entertain the option proposed above, the inline rollback from bootstraps will also need to be removed and follow the similar clean up model as inline rollback for regular writers. 

Updating

...

Metadata Table

As of  RFC-15 (HUDI-1292) there will only be a single writer to the consolidated metadata. We need to ensure that only a single writer updates the metadata table at any point of time. Applying the same changes twice is OK. The reconciliation of the metadata will be done by the metadata reader which will see unique instant times and corresponding metadata.

...

In this RFC, we propose to implement Option 1.


Image RemovedImage Added

Dependency/Follow up

...

By default, the scheduling of operations will be enabled for any job for backwards compatibility for current users. The users need to ensure they turn on optimistic_concurrency_control for all writers.

Future Work (Lock Free-ish Concurrency Control)


As discussed before, there are 3 ways to achieve serializable snapshot isolation using multiple writers. Here, we chose to implement a lock based optimistic concurrency control mechanism that allows for concurrent writers to update the state of the table at the same time, while handling conflict resolution just before committing the transaction. Although we explained above the many technical pros and cons of using this approach, we have not discussed some of the business driven pros and cons of using optimistic locking. 

...

All of this is applicable to us when thinking of the problem of strong and eventual consistency as analogous to update conflicts from multiple writers. To achieve LOCK FREE snapshot isolation, we need to have different updates applied to the same datum without any synchronization or consensus, here LOCKS, but finally get the correct, consistent view of the datum

Strong Eventual Consistency

Another option to achieve eventual consistency is by exploring a Conflict-Free Replicated Data Types (CRDT). In a CRDT model, data can be updated independently & concurrently without the need for any co-ordination/consensus but with a predefined set of rules that are applied during conflict resolution at a later stage. 

...

Since in these methodologies, we are deferring this merging to the reader (realtime view), all of the following documentation assumes the hoodie table type to be MergeOnRead tables.

CRDTs

Hudi supports partial row updates, which means 2 different writes/updates could be updating 2 different columns in the same row. Since input data to Hudi are generally coming from a different source such as Kafka which may receive out of order writes, Hudi by itself cannot assign ordered timestamps to these updates. Hence, LWW based on timestamp does not directly work here. 

...

To summarize, using a CRDT, we can allow multiple writers to commit concurrently without the need for a lock to perform conflict resolution while still providing ACID semantics. When readers construct the latest state of the table, a CRDT ensures the 2 different snapshots of the data get applied and a deterministic, serial order can be maintained on update conflicts

Since CRDTs have to be commutative, the order of "merge" does not matter to generate correct result after applying the rules.


NOTE : One of the problems of eventual consistency with CRDTs arises with replication across multiple nodes making it not ACID compliant. Since Hudi relies on underlying DFS based replication, this concern does not apply. 


Total Ordering and Overwrite Latest Payload 

Although CRDTs posses the above qualities, without a time based ordering, data can be committed in any order and readers can view the data in any order. This violates Hudi's current single-writer serialized snapshot isolation in which writes always happen in monotonically increasing timestamp order and that is how readers view the data. Additionally, Hudi already supports some out of the box methodologies to merge existing data with newly written data such as OverwriteLatestPayload. Since these cannot rely on the internal semantics of the data (as that is dependent on user payloads) and OverwriteLatestPayload is NOT a CRDT, we need a mechanism / rule (another CRDT a CRDT of sorts ? (smile) ) to ensure these merge methodologies generate correct, expected results during concurrent writes as they would during serial writes.

Here, Hudi's commit timestamp comes to the rescue. Hudi has a _hoodie commit_time associated with every record. A default implementation of a CRDT would be to provide an ordered set of updates to a datum in order of commit times. With this, applying the merge function in this order creates a CRDT. For Hudi's default implementation, the merge function available is the overwrite_latest_payload that simply takes the latest record. 


NOTE: Supporting such kind of ordered merge might require to first sort the LogBlocks in increasing order of START_COMMIT_TIME since updates can happen out of order and then apply the OverwriteLatestPayload.


START_COMMIT_TIMESTAMP START_COMMIT_TIMESTAMP vs END_COMMIT_TIMESTAMP

Let's take the example of these overlapping transactions as below 


Image RemovedImage Added


Consider 4 transactions T1, T2, T3, T4 starting and ending at different instants of time. Ideally, as per general transactions in a database, one would assume the following serial order for these transactions if they were to be done in a serial fashion.


Image RemovedImage Added

This works fine for general purpose database transactions where the input (updates to rows) are already prepared before starting the transactions. In Hudi, this is not true. Due to the nature of LAZY evaluation of data (Dataframes, Datasets etc..), when the actual update is prepared can be different from when the transaction start time. Hence, ordering transactions / updates on the start_commit_timestamp may not yield correct results. 

...

An alternate options is to serially order them based on transactions end time (end_commit_timestamp), the serial order would look as follows 


Image RemovedImage Added


As you can see, the T2 jumps from being the second transaction to the last one. 

...

Now it's hard to determine what should be the correct order to choose, start or end commit timestamp since we do not know when the input data is actually materialized. Assuming materialization of data is based on some kind of checkpoint and that the latest snapshot read for a particular transaction is based on the transactions start timestamp (since that can be decided at the beginning in the AbstractHoodieWriteClient), we choose to order transactions based on START_COMMIT_TIMESTAMP.

Note, implementing this requires changes in Hudi to ensure that the latest snapshot of the table is sealed during the start of the transaction and does not change during the course of the transaction all the way till the end. Without this change, our above assumptions are violated that will leave the table in an inconsistent state. More specifically, this requires sealing the ActiveTimeline and not reloading it during the entirety of the transaction. 


Table Services


Scheduling & Executing

Although we can solve the problems of update conflicts using CRDTs, there still exists a problem of how to co-ordinate other table services with these concurrent writes. Services such as compaction and clustering require a single writer or a table level lock to get the current state of the system before such a service is scheduled. In its current implementation, no concurrent writer can be started when scheduling these services. Let's take a look at the underlying assumptions and limitations 

...

We should be able to get the past state of the table up to the timeline without any inflights. Basically, we want to avoid any on-going write to a log file, so we want to get the latest completed state of the table. Now, at this point, we will know which base_file + log files to compact. The problem is how to ensure the log files with the base commit as the previous compaction commit for which future writes are happening get picked up by the next compaction. Here, we simply add some logic to the HoodieTableFileSystemView during building of the FileSlices, in the collection of all log files, when we slice it into groups belong for Cn compaction commit and ones belonging to Cn-1 compaction commit, we can order the times based on commit timestamp and simply group them together. This way, the on-going commits to the log files with the base commit as the previous compaction commit timestamp will be "fast forwarded" to the FileSlice with the base file with the new compaction commit. 

Versioning (Cleaning)


With log based model and concurrent updates happening on the table at different instants of time, how many and which versions to keep becomes a challenge. We cannot remove or clean older versions of data until we know it is "safe" to do so. This safety depends on whether the latest, correct update version has been recorded in the log file. 

Additionally, Hudi already only supports "cleaning" for MergeOnRead tables at FileSlice levels, any new updates going to log files cannot be cleaned individually, but only at the file slice level. With CRDTs, same will apply.


Incremental Pulls


Incremental pull allows to pull changes happening on a dataset. Without multiple writers, there is a monotonically increasing timestamp against which incremental pull can extract the changes since the last timestamp. With multiple writers, as pointed above in optimistic concurrency implementation, this is violated. Another important thing to note is that incremental pull currently provides the latest snapshot of a record by merging all intermediate changes that happened from time t1 to t2. 

For CRDTs based concurrency management, with updates applied out of order, we want to generate the incremental pull snapshot by applying the same rules of CRDT so that no updates are lost for downstream consumers and they always see consistent state of the data. NOTE, for the OverwriteWithLatestRecord implementation is not a CRDT and special handling is needed for this as pointed below. 

This would mean that Spark, Presto and Hive MOR incremental pull have to implement the same semantics of ordering updates like in a RealtimeView and in compaction. 

Overhead of MergeOnRead with CRDTs

COW vs MOR


NOTE that incremental pull based on END_COMMIT_TIME is still required for COPY_ON_WRITE tables using optimistic concurrency control management.


Overhead of MergeOnRead (context for advanced readers)

COW vs MOR


Image Added


LetImage RemovedLet's dive deeper into the merge cost of MergeOnRead tables. Current Costs


COW read path

Image Added

  • Vectorized retrieval of latest value of columns
  • Predicate pushdown to parquet reader
  • No need to perform "merge" operation
  • No materialization of columns to rows
  • ONE cost for serialization and deserialization
    • Parquet to InternalRow for Spark
    • Parquet to ColumnRow for Presto
    • Parquet to ArrayWriteable for Hive


MOR read path


Image Added

  • NO Vectorized retrieval of latest value of columns, will be slower and result in more CPU time for queries
  • Need to perform "merge" operation, requires Memory and CPU to serde, hash lookups etc
  • Materialization of columns to rows (whole row is required), increases Memory and I/O cost of how much data is read from disk
  • THREE costs for serialization and deserialization
    • Parquet to GenericRecord
    • LogBlock Avro bytes to GenericRecord
    • GenericRecord to InternalRow, ColumnRow, ArrayWriteable for individual query engines
  • No Predicate Pushdown even with parquet base files since reading the full row is required to merge contents