Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Table of Contents

Proposers

Approvers

Status

Current state


Current State

Status
titleUnder Discussion


Status
colourYellow
titleIn Progress

(tick)

Status
colourRed
titleABANDONED


Status
colourGreen
titleCompleted


Status
colourBlue
titleINactive


...

JIRA: here

Released: <Hudi Version>

Abstract

The Hudi library enables to create, manage and modify tables on the DFS using an engine agnostic client library. This allows clients a lot of flexibility to manage tables by embedding this library in their user code and running as they need, based on the schedule that suits them. Since there is no “external server” component to managing these tables, it also allows for significantly less operational burden that comes with maintaining and scaling such solutions. Hudi currently supports ACID guarantees between writer and readers. Currently, Hudi allows only a single client writer to mutate the table on the DFS. This RFC proposes to introduce multiwriter feature to Hudi, allowing non-overlapping file level writes to be done in parallel from multiple different clients. concurrency management.

Background

Hudi currently supports a single writer model , while taking the common use-cases and uses MVCC for concurrently updating a table like via tables services such as clustering, compaction, cleaning and them them as table services, that can , thus allowing then to run asynchronously without blocking writers. But Hudi only allows a single client writer to mutate the table on the DFS. Using a distributed processing engine like Spark, Hudi allows horizontal scaling features to manage different scales of workloads. Using horizontal scaling, users can provide similar runtimes and latencies of ingestion into Hudi tables even during spiky traffic. 

Over time, we have noticed a lot of activity and requests from users for adding multi-writer capabilities to the Hudi library, to handle cases not covered by the built-in table services. There are many examples where this is helpful. From users, we have learnt that although scaling Hudi jobs is a way to handle spiky traffic, there are some fundamental business challenges which necessitate the need for multiple writers to a Hudi table. A classic example cited is backfill. Many times, due to either changes in business requirements or data correction, pipelines require to backfill historical data into tables. As one can imagine, backfilling historical data can be orders of magnitudes higher than steady state ingestion throughput. To keep freshness of data and runtime of jobs the same, one way is to add more processing power to the same job. But there are cases where this option is not viable. First, many times this kind of auto tuning capabilities are not available to clients, thus risking latency of incoming new traffic to the table by allowing backfill data to be processed by the same job. Second, users are flexible about having different reliability guarantees on fresh, new data showing up on the table vs backfilling old data. Users would rather have high reliability and guaranteed latency for steady state new traffic and lower guarantees for backfilled data. Additionally, many times in organizations, these are completely 2 different teams, one responsible for ingesting new data and another to perform backfills/corrections of old data.

Concurrency vs Parallelism


Concurrency

Wikipedia -> “In computer science, concurrency is the ability of different parts or units of a program, algorithm, or problem to be executed out-of-order or in partial order, without affecting the final outcome. This allows for parallel execution of the concurrent units, which can significantly improve overall speed of the execution in multi-processor and multi-core systems. In more technical terms, concurrency refers to the decomposability property of a program, algorithm, or problem into order-independent or partially-ordered components or units.”

In Hudi’s context, if resource = file, if there are multiple different writers attempting to change the same file, they should be able to do so without affecting the final outcome. 

Parallelism

Wikipedia -> “Parallel computing is a type of computation where many calculations or the execution of processes are carried out simultaneously. Large problems can often be divided into smaller ones, which can then be solved at the same time. There are several different forms of parallel computing: bit-level, instruction-level, data, and task parallelism. Parallelism has long been employed in high-performance computing, but has gained broader interest due to the physical constraints preventing frequency scaling.”

In Hudi’s context, if resource = file, if there are multiple different writers attempting to mutate the table, they can change different files or partitions concurrently, as long as they are not overlapping the same resource. 

...

  1. Unique records across multiple writers during inserts
    1. If multiple writers are writing to different files in parallel, Hudi cannot guarantee uniqueness of keys across partitions, unlike the single writer model. Ensuring unique keys is left up to the users
  2. ReadSerializability across partitions
    1. Since different writers to different files can finish at varying times, thus committing data written to files in any order, Hudi cannot guarantee read serializability.
  3. Global index support (only HbaseIndex)
    1. Since Global Index (e.g. HbaseIndex) requires a unique key across all partitions in the table, Hudi cannot support Global Index for tables requiring parallel writing due to constraint (1) above. Note, GlobalSimpleIndex works fine since it first partitions based on HoodieKey and then checks the record key per partition. 

Implementation

The good news is that Hudi’s MVCC based reader/writer isolation has already laid out a great foundation for parallel writing. There are few features which prevent parallel writing. The following section describes what those features are and how we plan to alter them to support multi writer. 

Inline rollback

The current Hudi writer job supports automatic inline rollback. The rollback feature is used to clean up any failed writes that may have happened in the immediate past runs of the job. The clean up process could involve cleaning up invalid data files, index information as well as any metadata information that is used to constitute the Hudi table. Hudi does this by checking if the commit under .hoodie directory is a completed commit or an inflight commit. If it is an inflight commit, it will perform the necessary rollback before progressing to ingest the next batch of data. When there are multiple different clients attempting to write to the table, this can result in jobs rolling back each other’s inflight data leading to chaos and corruption. To ensure we support rollback with parallel multi writer, we need a way to avoid this. Let’s entertain a drastic option to remove To remove inline rollbacks. There , there are few things that need to be done if this option were to be considered. done 

  1. Ensure all valid data is always guarded by completed instants on the timeline. Any failed writes to data or metadata become garbage data which are innocuous and don’t affect correctness.
  2. This kind of garbage data would be lying around the DFS forever, taking up unnecessary space and contributing to file count and some . We need a "cleaner" process needs to clean up this data at a later instant of time (which is what the inline rollbacks were doing in the first place)

...

Additionally, it can so happen that writer jobs can hang due to several reasons such as long GC pauses or other code bugs. In this situation, due to lack of heartbeats, a cleaner may end up considering this as a failed write and clean up data and metadata. Hence, we also need to implement similar checks on the writer before committing to ensure we abort such writers. 

Ingestion Retry

We CANNOT use the same commit time to retry a failed batch of ingestion any more. Without inline rollbacks, we are unable to remove failed writes from a previous unsuccessful job, hence every time a job tries to ingest data, it has to use a new commit time. 

Bootstrap Hudi table

Users cannot run bootstrap in parallel. They need to finish the bootstrap of a non-hudi table into Hudi first, and then start single or multi writers. Rollback of failed bootstrap’s are also inline at the moment. To entertain the option proposed above, the inline rollback from bootstraps will also need to be removed and follow the similar clean up model as inline rollback for regular writers. 

Updating Consolidated Metadata

As of  RFC-15 (HUDI-1292) there will only be a single writer to the consolidated metadata but the model works well as long as we have unique instant times writing data concurrently to the consolidated metadata. The reconciliation . We need to ensure that only a single writer updates the metadata table at any point of time. Applying the same changes twice is OK. The reconciliation of the metadata will be done by the metadata reader which will see unique instant times and corresponding metadata.

Incremental reads

With multiple writers, there is no guarantee of ordering of data since different writers can finish writing at different instants of time. Here, we have two options for incremental reads

Option 1

Allow out of order reading (not serializable)

Pros

  • This allows fresh data to be incrementally read
  • The job working on the partition might also down the road incrementally want to consume it ASAP

...

  • Violates read serializability currently supported by Hudi single writer model
  • Requires special a timeline API that users can use to get the last commit time to store as a checkpoint to avoid data loss of previous running writer jobs
  • May require similar changes for post commit hook which allows commit times to be published and downstream jobs to move checkpoints
  • Since we cannot move the checkpoint until all previous writers have finished, the incremental read will reprocess records until we move forward the checkpoint

Option 2

Fence till all previous commits have succeeded before allowing incremental reads

Pros 

  • Guarantees read serializability
  • Works as is, no special timeline API since checkpoints will not move forward until all commits previously have succeeding. This is the model supported for READ_OPTIMIZED VIEW incremental reads for MERGE_ON_READ tables which suffer from the same issue.

...

  • Incremental reads are blocked for all writers and suffer from straggler writer problems. Consider a situation where a backfill job started at time t1. This job could take many hours to finish, say at tn. In the meantime, many writers of fresh data have finished over time t2, t3, tn. With this option, we would have to wait tn-1 amount of time to get incremental reads of all new data.

Option 3

Provide incremental reads in order of writer finish time rather than commit time. 

This has the same Pros and Cons as Option 1 with added API to move checkpoints based on a timeline different from the commit timeline. Essentially, you can imagine an overlay timeline called CompletedCommitOrderedTimeline() which provides time based on completed jobs that allows checkpoints to move forward while preserving commit timeline for Hudi operations. 


In this implementation, we will allow out of order reading and leave the serialization of incremental reads to the users. As a follow up of this RFC, we will implement Option 3.

Scheduling Table Management Services

Scheduling Table Management Services such as cleaning, archiving, compaction, clustering are all done with a single writer model. The assumption is that a single controller can do these operations to ensure we can guarantee ACID between readers and writers, as well as run all operations asynchronously using the MVCC pattern. 

...

In a way, you can think of a Table Management Service as another concurrent writer to the table. We do acknowledge that users may require a priority ordering when choosing to resolve conflict between a less important "clustering" activity vs an important "update" activity. The framework will allow for implementing such priority ordering but the details of how the priority ordering will be implemented is out of the scope of this RFC and may be followed up in a different RFC.

Unique instant times

Option 1

Need to support providing a test-and-set kind of mechanism to provide unique instant times to parallel writers.

...

  • Requires atomic operations that are not available on all cloud stores

Option 2

 Increasing granularity of commit times to milliseconds

...

  • We can see clashes if 2 jobs started at the same milliseconds (although the probability is very low and this option is used by OLTP systems like C* to resolve latest record)
  • Requires an upgrade/migration of the timeline version that can make it slightly complicated from an operation standpoint

Concurrency Control

Option 1

...

In this implementation, we will continue to use the SECONDS level granularity for generating hoodie commit times. In a follow up RFC, we will describe and implement Option 2.

Concurrency Control

Option 1

Optimistic Concurrency using Locking with External Server with Conflict Resolution (Multiple writers can succeed)

Step 1: Read the latest committed data from DFS or external server

...

Step 4: Before committing, check if the latest committed data to DFS or external server is the same as in Step 1. If YES, then continue to update the commit time atomically and reflect the new commit time for readers, release the LOCK. If NOT, compare the NEW_FILES_BEING_COMMITTED vs FILES_MUTATED_BY_COMMITS since the last committed data read in Step 1. If there are overlapping files, abort the write. If there are no overlapping files, finish the commit on DFS and update max(current_commit_time, latest_commit_time_from_external_server) if applicableNOTE: For the first phase of implementation, we will support Hive Metastore as the external server which uses Zookeeper to provide LOCKING functionality

Option 2

Optimistic Concurrency using atomic renames with Conflict Resolution

Step 1: Read the latest committed data from the hoodie timeline

...

  1. DFS supports atomic renames
  2. DFS supports atomic file create

Option 3

Both the above options have some caveats 

  1. Requires an external server deployment to acquire and stop the world LOCK for a time during which the commit is happening. 
  2. Works Works only for COPY_ON_WRITE tables
  3. Only works on HDFS which supports atomic renames and does not work for S3 for example. 

There are 2 types consistency models that ensure correctness with concurrency

  1. Strong Consistency (usually a 2PhaseCommit protocol is used).
  2. Eventual Consistency (usually uses a conflict resolution methodology like timestamp).


presents many options. One option is by exploring a Conflict-Free Replicated Data Type (CRDT) type model. In a CRDT model, data can be updated independently & concurrently without the need for any co-ordination but with a predefined set of rules that are applied during conflict resolution at a later stage. A good example is the Last Writer Wins (LWW) merge function that allows for conflict resolution. 

...

In this RFC, we propose to implement Option 1. In the future, we will add support for Option 2 as well as Option 3 in the futurea follow up RFC.

Dependency/Follow up

With multiple writers, there is no guarantee on ordered completion of commits (for conflict-free multiple writers). Incremental reads depend on monotonically increasing timestamps to ensure no insert/update is missed. This guarantee will be violated with multiple writers. To be able to use multiple writers, we will introduce a new notion to the hoodie timeline for each commit called END_COMMIT_TIME along with the START_COMMIT_TIME that is already used for performing commits. Incremental Reads will depend on END_COMMIT_TIME to tail data and also publish them to downstream systems to be able to checkpoint them. 

An Example of Concurrency control with Priority for clustering/compactions

...

  1. Write a new REPLACE metadata to reverse the mapping of f1,f2,f3 -> g1 that was done before.
  2. NOTE that the entire mapping needs to be reversed since records can go from M file groups to N file groups
  3. Need to ensure that the previous version (before clustering) is not cleaned

...

Rollout/Adoption Plan

Once the proposed solution is implemented, users will be able to run jobs in parallel by simply launching multiple writers. Upgrading to the latest Hudi release providing this feature will automatically upgrade your timeline layout. 

Test Plan


Code Block
// Add compaction, clustering
// Check archival kicking in
// Schema Evolution ? (older commit vs new commit)
For TableTypes in (COW, MOR)
	For ConcurencyMode in (SingleWriter, OCC (with non-overlapping & overlapping file ids))
		Test 1:  EAGER cleaning, without metadata (validate no regression)
		Test 2:  EAGER cleaning, with metadata (validate no regression)
		Test 3:  EAGER cleaning, with metadata, with async cleaning (validate no regression)
		Test 4:  LAZY cleaning, without metadata
		Test 5:  LAZY cleaning, with metadata
		Test 6:  LAZY cleaning, with metadata, with async cleaning
	    Test 7:  Incremental Pull (out of order execution of commits for OCC) 


Failures & Build rollback

TBD

Turning on Scheduling Operations

By default, the scheduling of operations will be enabled for any job for backwards compatibility for current users. The users need to ensure they turn off scheduling and then turn it on only for the dedicated job.

Test Plan

Unit tests and Test suite integration

...