Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Hudi currently supports a single writer model and uses MVCC for concurrently updating a table via tables services such as clustering, compaction, cleaning, thus allowing then to run asynchronously without blocking writers. Using MVCC, Hudi is able to provide Snapshot Isolation guarantees. Let's take a quick look at the different levels of isolations and their orders with respect to vulnerabilities such as dirty reads, non-repeatable reads and phantom reads. 


Image RemovedImage Added

With Snapshot Isolation, readers are able to get repeatable reads (get the same result if queried multiple times during the same write/transaction) since each reader works on a "snapshot" of the database, aka specific version of latest committed data seen during the start of that transaction. This is possible with MVCC keeping multiple versions of data. Although, this is only true when there are no write-write conflicts also known as write skews. During a concurrent update of data; since each writer is working with its own "snapshot/version" of the data, snapshot isolation cannot guarantee that no updates are lost if each of the writers were independently modifying the same datum (record or file). Snapshot Isolation hence is vulnerable to update conflicts unless a conflict resolution strategy is applied. Update conflicts are generally solved in different ways as follows (we will not discuss READ_UNCOMMITTED since that violates basic requirements of Hudi anyways) 

...

Additionally, it can so happen that writer jobs can hang due to several reasons such as long GC pauses or other code bugs. In this situation, due to lack of heartbeats, a cleaner may end up considering this as a failed write and clean up data and metadata. Hence, we also need to implement similar checks on the writer before committing to ensure we abort such writers. 


Image RemovedImage Added

Ingestion Retry

...

In this RFC, we propose to implement Option 1.


Image RemovedImage Added

Dependency/Follow up

...

Let's take the example of these overlapping transactions as below 


Image RemovedImage Added


Consider 4 transactions T1, T2, T3, T4 starting and ending at different instants of time. Ideally, as per general transactions in a database, one would assume the following serial order for these transactions if they were to be done in a serial fashion.


Image RemovedImage Added

This works fine for general purpose database transactions where the input (updates to rows) are already prepared before starting the transactions. In Hudi, this is not true. Due to the nature of LAZY evaluation of data (Dataframes, Datasets etc..), when the actual update is prepared can be different from when the transaction start time. Hence, ordering transactions / updates on the start_commit_timestamp may not yield correct results. 

...

An alternate options is to serially order them based on transactions end time (end_commit_timestamp), the serial order would look as follows 


Image RemovedImage Added


As you can see, the T2 jumps from being the second transaction to the last one. 

...

Overhead of MergeOnRead (context for advanced readers)

COW vs MOR


Image RemovedImage Added


Let's dive deeper into the merge cost of MergeOnRead tables. 


COW read path

Image RemovedImage Added

  • Vectorized retrieval of latest value of columns
  • Predicate pushdown to parquet reader
  • No need to perform "merge" operation
  • No materialization of columns to rows
  • ONE cost for serialization and deserialization
    • Parquet to InternalRow for Spark
    • Parquet to ColumnRow for Presto
    • Parquet to ArrayWriteable for Hive


MOR read path


Image RemovedImage Added

  • NO Vectorized retrieval of latest value of columns, will be slower and result in more CPU time for queries
  • Need to perform "merge" operation, requires Memory and CPU to serde, hash lookups etc
  • Materialization of columns to rows (whole row is required), increases Memory and I/O cost of how much data is read from disk
  • THREE costs for serialization and deserialization
    • Parquet to GenericRecord
    • LogBlock Avro bytes to GenericRecord
    • GenericRecord to InternalRow, ColumnRow, ArrayWriteable for individual query engines
  • No Predicate Pushdown even with parquet base files since reading the full row is required to merge contents

...