Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note, implementing this requires changes in Hudi to ensure that the latest snapshot of the table is sealed during the start of the transaction and does not change during the course of the transaction all the way till the end. Without this change, our above assumptions are violated that will leave the table in an inconsistent state. More specifically, this requires sealing the ActiveTimeline and not reloading it during the entirety of the transaction. 


Table ServicesScheduling 


Scheduling & Executing

Although we can solve the problems of update conflicts using CRDTs, there still exists a problem of how to co-ordinate other table services with these concurrent writes. Services such as compaction and clustering require a single writer or a table level lock to get the current state of the system before such a service is scheduled. In its current implementation, no concurrent writer can be started when scheduling these services. Let's take a look at the underlying assumptions and limitations 

  1. Cleaning - No problem since only requires the state of the table in the past
  2. Compaction
    1. Scheduling - Phantom base file is created based on the scheduled compaction instant time. If multiple writers start at the same time as scheduling, inserts and updates get written to older log files which may not get picked up by the compaction process, thus resulting in data loss.
    2. Execution - No problem if the scheduling is taken care of
  3. Clustering
    1. Execution - In its current state, clustering cannot execute in parallel to concurrent updates. If a clustering is scheduled on some file groups and updates come along for those file groups, the writes will fail until clustering finishes. When implemented, this will NOT be a problem just like compaction.
    2. Scheduling - The scheduling of clustering while concurrent writes are happening can present some race conditions leading to data loss.


To solve the issue of scheduling table services, we have the following options


Option 1 : Brief, table level locks for all writers and table services - Writers take a table level lock for the duration of creating the .commit file under the hoodie folder. Note, this is NOT optimistic concurrency. The lock is just meant to "seal" the state of the table when committing data to it. Similarly, scheduling a table service takes a lock to scan and create the plan. 

Option 2: Use monotonic timestamps to create ordered static views of the file system


Option 2 implementation 


BASE FILE (F1_C1) + LOG_FILE (F1_L1_C1).<sequence_id> : For HDFS, we append multiple log blocks with different commit times inside the same log file. It's then hard to know which commits have gone into the LOG file just by the name. Instead, if we could change adopt the following changes

  1. No more appends - This is something common for all cloud stores anyways and we should just force this on all DFSs
  2. Rename log file naming format - (F1_L1_C1).<sequence_id>, change the sequence_id to the actual commit time that's adding that log file, something like (F1_L1_C1).<commit_timestamp>


Now, consider the following sequence of operations without any LOCKS.

C1 C2 C3.inflight C4 C5.inflight 


Now,  a compaction is scheduled. 

C1 C2 C3.inflight C4 C5.inflight  COMPACTION.schedule.C6 C7.inflight

We should be able to get the past state of the table up to the timeline without any inflights. Basically, we want to avoid any on-going write to a log file, so we want to get the latest completed state of the table. Now, at this point, we will know which base_file + log files to compact. The problem is how to ensure the log files with the base commit as the previous compaction commit for which future writes are happening get picked up by the next compaction. Here, we simply add some logic to the HoodieTableFileSystemView during building of the FileSlices, in the collection of all log files, when we slice it into groups belong for Cn compaction commit and ones belonging to Cn-1 compaction commit, we can order the times based on commit timestamp and simply group them together. This way, the on-going commits to the log files with the base commit as the previous compaction commit timestamp will be "fast forwarded" to the FileSlice with the base file with the new compaction commit. 


Versioning (Cleaning)

Executing


Periodic, all-replica compression by flattening and rebuilding tree – Requires consensus, but only rarely – Needs to be aborted if ongoing updates at any replicaWith log based model and concurrent updates happening on the table at different instants of time, how many and which versions to keep becomes a challenge. This is work in progress.