Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

By default, the scheduling of operations will be enabled for any job for backwards compatibility for current users. The users need to ensure they turn on optimistic_concurrency_control for all writers.

Future Work (Lock Free-ish Concurrency Control)


As discussed before, there are 3 ways to achieve serializable snapshot isolation using multiple writers. Here, we chose to implement a lock based optimistic concurrency control mechanism that allows for concurrent writers to update the state of the table at the same time, while handling conflict resolution just before committing the transaction. Although we explained above the many technical pros and cons of using this approach, we have not discussed some of the business driven pros and cons of using optimistic locking. 

...

All of this is applicable to us when thinking of the problem of strong and eventual consistency as analogous to update conflicts from multiple writers. To achieve LOCK FREE snapshot isolation, we need to have different updates applied to the same datum without any synchronization or consensus, here LOCKS, but finally get the correct, consistent view of the datum

Strong Eventual Consistency

Another option to achieve eventual consistency is by exploring a Conflict-Free Replicated Data Types (CRDT). In a CRDT model, data can be updated independently & concurrently without the need for any co-ordination/consensus but with a predefined set of rules that are applied during conflict resolution at a later stage. 

...

Since in these methodologies, we are deferring this merging to the reader (realtime view), all of the following documentation assumes the hoodie table type to be MergeOnRead tables.

CRDTs

Hudi supports partial row updates, which means 2 different writes/updates could be updating 2 different columns in the same row. Since input data to Hudi are generally coming from a different source such as Kafka which may receive out of order writes, Hudi by itself cannot assign ordered timestamps to these updates. Hence, LWW based on timestamp does not directly work here. 

...

To summarize, using a CRDT, we can allow multiple writers to commit concurrently without the need for a lock to perform conflict resolution while still providing ACID semantics. When readers construct the latest state of the table, a CRDT ensures the 2 different snapshots of the data get applied and a deterministic, serial order can be maintained on update conflicts

Since CRDTs have to be commutative, the order of "merge" does not matter to generate correct result after applying the rules.


NOTE : One of the problems of eventual consistency with CRDTs arises with replication across multiple nodes making it not ACID compliant. Since Hudi relies on underlying DFS based replication, this concern does not apply. 


Total Ordering and Overwrite Latest Payload 

Although CRDTs posses the above qualities, without a time based ordering, data can be committed in any order and readers can view the data in any order. This violates Hudi's current single-writer serialized snapshot isolation in which writes always happen in monotonically increasing timestamp order and that is how readers view the data. Additionally, Hudi already supports some out of the box methodologies to merge existing data with newly written data such as OverwriteLatestPayload. Since these cannot rely on the internal semantics of the data (as that is dependent on user payloads) and OverwriteLatestPayload is NOT a CRDT, we need a mechanism / rule (another CRDT a CRDT of sorts ? (smile) ) to ensure these merge methodologies generate correct, expected results during concurrent writes as they would during serial writes.

Here, Hudi's commit timestamp comes to the rescue. Hudi has a _hoodie commit_time associated with every record. A default implementation of a CRDT would be to provide an ordered set of updates to a datum in order of commit times. With this, applying the merge function in this order creates a CRDT. For Hudi's default implementation, the merge function available is the overwrite_latest_payload that simply takes the latest record. 


NOTE: Supporting such kind of ordered merge might require to first sort the LogBlocks in increasing order of START_COMMIT_TIME since updates can happen out of order and then apply the OverwriteLatestPayload.


START_COMMIT_TIMESTAMP vs END_COMMIT_TIMESTAMP

...

Now it's hard to determine what should be the correct order to choose, start or end commit timestamp since we do not know when the input data is actually materialized. Assuming materialization of data is based on some kind of checkpoint and that the latest snapshot read for a particular transaction is based on the transactions start timestamp (since that can be decided at the beginning in the AbstractHoodieWriteClient), we choose to order transactions based on START_COMMIT_TIMESTAMP.

Note, implementing this requires changes in Hudi to ensure that the latest snapshot of the table is sealed during the start of the transaction and does not change during the course of the transaction all the way till the end. Without this change, our above assumptions are violated that will leave the table in an inconsistent state. More specifically, this requires sealing the ActiveTimeline and not reloading it during the entirety of the transaction. 


Table Services


Scheduling & Executing

Although we can solve the problems of update conflicts using CRDTs, there still exists a problem of how to co-ordinate other table services with these concurrent writes. Services such as compaction and clustering require a single writer or a table level lock to get the current state of the system before such a service is scheduled. In its current implementation, no concurrent writer can be started when scheduling these services. Let's take a look at the underlying assumptions and limitations 

...

We should be able to get the past state of the table up to the timeline without any inflights. Basically, we want to avoid any on-going write to a log file, so we want to get the latest completed state of the table. Now, at this point, we will know which base_file + log files to compact. The problem is how to ensure the log files with the base commit as the previous compaction commit for which future writes are happening get picked up by the next compaction. Here, we simply add some logic to the HoodieTableFileSystemView during building of the FileSlices, in the collection of all log files, when we slice it into groups belong for Cn compaction commit and ones belonging to Cn-1 compaction commit, we can order the times based on commit timestamp and simply group them together. This way, the on-going commits to the log files with the base commit as the previous compaction commit timestamp will be "fast forwarded" to the FileSlice with the base file with the new compaction commit. 

Versioning (Cleaning)


With log based model and concurrent updates happening on the table at different instants of time, how many and which versions to keep becomes a challenge. We cannot remove or clean older versions of data until we know it is "safe" to do so. This safety depends on whether the latest, correct update version has been recorded in the log file. 

Additionally, Hudi already only supports "cleaning" for MergeOnRead tables at FileSlice levels, any new updates going to log files cannot be cleaned individually, but only at the file slice level. With CRDTs, same will apply.


Incremental Pulls


Incremental pull allows to pull changes happening on a dataset. Without multiple writers, there is a monotonically increasing timestamp against which incremental pull can extract the changes since the last timestamp. With multiple writers, as pointed above in optimistic concurrency implementation, this is violated. Another important thing to note is that incremental pull currently provides the latest snapshot of a record by merging all intermediate changes that happened from time t1 to t2. 

...

NOTE that incremental pull based on END_COMMIT_TIME is still required for COPY_ON_WRITE tables using optimistic concurrency control management.


Overhead of MergeOnRead

...

(context for advanced readers)

COW vs MOR



Let's dive deeper into the merge cost of MergeOnRead tables. 

...

  • Vectorized retrieval of latest value of columns
  • Predicate pushdown to parquet reader
  • No need to perform "merge" operation
  • No materialization of columns to rows
  • ONE costs cost for serialization and deserialization
    • Parquet to InternalRow for Spark
    • Parquet to ColumnRow for Presto
    • Parquet to ArrayWriteable for Hive


MOR read path


  • NO Vectorized retrieval of latest value of columns, will be slower and result in more CPU time for queries
  • Need to perform "merge" operation, requires Memory and CPU to serde, hash lookups etc
  • Materialization of columns to rows (whole row is required), increases Memory and I/O cost of how much data is read from disk
  • THREE costs for serialization and deserialization
    • Parquet to GenericRecord
    • LogBlock Avro bytes to GenericRecord
    • GenericRecord to InternalRow, ColumnRow, ArrayWriteable for individual query engines
  • No Predicate Pushdown even with parquet base files since reading the full row is required to merge contents

As we see above, since CRDTs have to be commutative, the current "in-memory" merge step should ensure the correct result is generated after applying the rules.

...