You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 16 Next »


Proposers

Approvers

Status

Current state


Current State

UNDER DISCUSSION

(tick)

IN PROGRESS


ABANDONED


COMPLETED


INACTIVE


Discussion thread: here

JIRA: here

Released: <Hudi Version>

Abstract

The Hudi library enables to create, manage and modify tables on the DFS using an engine agnostic client library. This allows clients a lot of flexibility to manage tables by embedding this library in their user code and running as they need, based on the schedule that suits them. Since there is no “external server” component to managing these tables, it also allows for significantly less operational burden that comes with maintaining and scaling such solutions. Hudi currently supports ACID guarantees between writer and readers. Currently, Hudi allows only a single client writer to mutate the table on the DFS. This RFC proposes to introduce multiwriter feature to Hudi, allowing non-overlapping file level writes to be done in parallel from multiple different clients. 

Background

Hudi currently supports a single writer model, that allows only a single client writer to mutate the table on the DFS. Using a distributed processing engine like Spark, Hudi allows horizontal scaling features to manage different scales of workloads. Using horizontal scaling, users can provide similar runtimes and latencies of ingestion into Hudi tables even during spiky traffic. 

Over time, we have noticed a lot of activity and requests from users for adding multi-writer capabilities to the Hudi library. There are many examples where this is helpful. From users, we have learnt that although scaling Hudi jobs is a way to handle spiky traffic, there are some fundamental business challenges which necessitate the need for multiple writers to a Hudi table. A classic example cited is backfill. Many times, due to either changes in business requirements or data correction, pipelines require to backfill historical data into tables. As one can imagine, backfilling historical data can be orders of magnitudes higher than steady state ingestion throughput. To keep freshness of data and runtime of jobs the same, one way is to add more processing power to the same job. But there are cases where this option is not viable. First, many times this kind of auto tuning capabilities are not available to clients, thus risking latency of incoming new traffic to the table by allowing backfill data to be processed by the same job. Second, users are flexible about having different reliability guarantees on fresh, new data showing up on the table vs backfilling old data. Users would rather have high reliability and guaranteed latency for steady state new traffic and lower guarantees for backfilled data. Additionally, many times in organizations, these are completely 2 different teams, one responsible for ingesting new data and another to perform backfills/corrections of old data.

Concurrency vs Parallelism


Concurrency

Wikipedia -> “In computer science, concurrency is the ability of different parts or units of a program, algorithm, or problem to be executed out-of-order or in partial order, without affecting the final outcome. This allows for parallel execution of the concurrent units, which can significantly improve overall speed of the execution in multi-processor and multi-core systems. In more technical terms, concurrency refers to the decomposability property of a program, algorithm, or problem into order-independent or partially-ordered components or units.”

In Hudi’s context, if resource = file, if there are multiple different writers attempting to change the same file, they should be able to do so without affecting the final outcome. 

Parallelism

Wikipedia -> “Parallel computing is a type of computation where many calculations or the execution of processes are carried out simultaneously. Large problems can often be divided into smaller ones, which can then be solved at the same time. There are several different forms of parallel computing: bit-level, instruction-level, data, and task parallelism. Parallelism has long been employed in high-performance computing, but has gained broader interest due to the physical constraints preventing frequency scaling.”

In Hudi’s context, if resource = file, if there are multiple different writers attempting to mutate the table, they can change different files or partitions, as long as they are not overlapping the same resource. 

Read more here to understand concurrency control

Hudi uses MVCC to ensure snapshot isolation and provide ACID semantics between a single writer and multiple readers. 

In this RFC, we propose to support a feature to allow concurrent writing to a Hudi table. This will enable users to start multiple writer jobs writing in parallel to non-overlapping files. If a concurrent write to a file is issued from 2 writers, the first one to commit will succeed. The following guarantees provided by Hudi single writer model will NOT be guaranteed in parallel writing mode

  1. Unique records across multiple writers during inserts
    1. If multiple writers are writing to different files in parallel, Hudi cannot guarantee uniqueness of keys across partitions, unlike the single writer model. Ensuring unique keys is left up to the users
  2. ReadSerializability across partitions
    1. Since different writers to different files can finish at varying times, thus committing data written to files in any order, Hudi cannot guarantee read serializability.
  3. Global index support (only HbaseIndex)
    1. Since Global Index (e.g. HbaseIndex) requires a unique key across all partitions in the table, Hudi cannot support Global Index for tables requiring parallel writing due to constraint (1) above. Note, GlobalSimpleIndex works fine since it first partitions based on HoodieKey and then checks the record key per partition. 

Implementation

The good news is that Hudi’s MVCC based reader/writer isolation has already laid out a great foundation for parallel writing. There are few features which prevent parallel writing. The following section describes what those features are and how we plan to alter them to support multi writer. 

Inline rollback

The current Hudi writer job supports automatic inline rollback. The rollback feature is used to clean up any failed writes that may have happened in the immediate past runs of the job. The clean up process could involve cleaning up invalid data files, index information as well as any metadata information that is used to constitute the Hudi table. Hudi does this by checking if the commit under .hoodie directory is a completed commit or an inflight commit. If it is an inflight commit, it will perform the necessary rollback before progressing to ingest the next batch of data. When there are multiple different clients attempting to write to the table, this can result in jobs rolling back each other’s inflight data leading to chaos and corruption. To ensure we support rollback with parallel multi writer, we need a way to avoid this. Let’s entertain a drastic option to remove inline rollbacks. There are few things that need to be done if this option were to be considered. 

  1. Ensure all valid data is always guarded by completed instants on the timeline. Any failed writes to data or metadata become garbage data which are innocuous and don’t affect correctness.
  2. This kind of garbage data would be lying around the DFS forever, taking up unnecessary space and contributing to file count and some process needs to clean up this data at a later instant of time (which is what the inline rollbacks were doing in the first place)

To clean up this garbage data, the problem boils down to detecting failed writers and having the ability to clean up these failed writes.

Here we propose to implement a heartbeat mechanism for every writer. Throughout the lifetime of the writer, it emits a heartbeat to the hoodie timeline to intimate any other process (here cleaner job) of it’s continued execution. A cleaner/rollback process looks at all outstanding writer requests on the timeline and checks if there are any failed writers (writers which haven’t provided a heartbeat in the allocated time range). If yes, it works on cleaning those failed writes, thus freeing up space on DFS of garbage data and removing inflight files from the timeline. This model helps to keep the timeline clean and doesn’t let failed writes to pollute the timeline over time. 

Additionally, it can so happen that writer jobs can hang due to several reasons such as long GC pauses or other code bugs. In this situation, due to lack of heartbeats, a cleaner may end up considering this as a failed write and clean up data and metadata. Hence, we also need to implement similar checks on the writer before committing to ensure we abort such writers. 

Ingestion Retry

We CANNOT use the same commit time to retry a failed batch of ingestion any more. Without inline rollbacks, we are unable to remove failed writes from a previous unsuccessful job, hence every time a job tries to ingest data, it has to use a new commit time. 

Bootstrap Hudi table

Users cannot run bootstrap in parallel. They need to finish the bootstrap of a non-hudi table into Hudi first, and then start single or multi writers. Rollback of failed bootstrap’s are also inline at the moment. To entertain the option proposed above, the inline rollback from bootstraps will also need to be removed and follow the similar clean up model as inline rollback for regular writers. 

Updating Consolidated Metadata

As of  RFC-15 (HUDI-1292) there will only be a single writer to the consolidated metadata but the model works well as long as we have unique instant times writing data concurrently to the consolidated metadata. The reconciliation of the metadata will be done by the metadata reader which will see unique instant times and corresponding metadata.

Incremental reads

With multiple writers, there is no guarantee of ordering of data since different writers can finish writing at different instants of time. Here, we have two options for incremental reads

Option 1

Allow out of order reading (not serializable)

Pros

  • This allows fresh data to be incrementally read
  • The job working on the partition might also down the road incrementally want to consume it ASAP

Cons

  • Violates read serializability currently supported by Hudi single writer model
  • Requires special a timeline API that users can use to get the last commit time to store as a checkpoint to avoid data loss of previous running writer jobs
  • May require similar changes for post commit hook which allows commit times to be published and downstream jobs to move checkpoints
  • Since we cannot move the checkpoint until all previous writers have finished, the incremental read will reprocess records until we move forward the checkpoint

Option 2

Fence till all previous commits have succeeded before allowing incremental reads

Pros 

  • Guarantees read serializability
  • Works as is, no special timeline API since checkpoints will not move forward until all commits previously have succeeding. This is the model supported for READ_OPTIMIZED VIEW incremental reads for MERGE_ON_READ tables which suffer from the same issue.

Cons

  • Incremental reads are blocked for all writers and suffer from straggler writer problems. Consider a situation where a backfill job started at time t1. This job could take many hours to finish, say at tn. In the meantime, many writers of fresh data have finished over time t2, t3, tn. With this option, we would have to wait tn-1 amount of time to get incremental reads of all new data.

Option 3

Provide incremental reads in order of writer finish time rather than commit time. 

This has the same Pros and Cons as Option 1 with added API to move checkpoints based on a timeline different from the commit timeline. Essentially, you can imagine an overlay timeline called CompletedCommitOrderedTimeline() which provides time based on completed jobs that allows checkpoints to move forward while preserving commit timeline for Hudi operations. 

Scheduling Table Management Services

Scheduling Table Management Services such as cleaning, archiving, compaction, clustering are all done with a single writer model. The assumption is that a single controller can do these operations to ensure we can guarantee ACID between readers and writers, as well as run all operations asynchronously using the MVCC pattern. 

With multiple writers, this assumption is violated. For any of such service to be scheduled, that scheduler needs to get a "static" view of the table so it can perform the necessary logic to schedule an operation. We propose to acquire a lock when scheduling such operations. During this lock, NO other writer can change the state of the table (aka successfully commit), providing a "static" view of the table to the scheduling operation. Once the scheduling is done, the lock will be released and other concurrent writers are free to acquire the lock and perform conflict resolution. 

In some sense, you can think of Table Management Services as another concurrent writer to the table. We do acknowledge that there can be a priority ordering users may require when choosing to resolve conflict between a less important "cleaning" activity vs an important "update" activity. The framework will allow for implementing such priority ordering but the details of how the priority ordering will be implemented is out of the scope of this RFC and may be followed up in a different RFC.

Unique instant times

Option 1

Need to support providing a test-and-set kind of mechanism to provide unique instant times to parallel writers.

Pros

  • No loopholes and is guaranteed to provide unique commit times in all cases

Cons

  • Requires atomic operations that are not available on all cloud stores

Option 2

 Increasing granularity of commit times to milliseconds

Pros 

  • Works for all kinds of cloud stores

Cons

  • We can see clashes if 2 jobs started at the same milliseconds (although the probability is very low and this option is used by OLTP systems like C* to resolve latest record)
  • Requires an upgrade/migration of the timeline version that can make it slightly complicated from an operation standpoint

Concurrency Control

Option 1

Optimistic Concurrency using Locking with External Server with Conflict Resolution (Multiple writers can succeed)

Step 1: Read the latest committed data from DFS or external server

Step 2: Perform write

Step 3: Acquire a table level LOCK before committing new data

Step 4: Before committing, check if the latest committed data to DFS or external server is the same as in Step 1. If YES, then continue to update the commit time atomically and reflect the new commit time for readers, release the LOCK. If NOT, compare the NEW_FILES_BEING_COMMITTED vs FILES_MUTATED_BY_COMMITS since the last committed data read in Step 1. If there are overlapping files, abort the write. If there are no overlapping files, finish the commit on DFS and update max(current_commit_time, latest_commit_time_from_external_server) if applicable. 

NOTE: For the first phase of implementation, we will support Hive Metastore as the external server which uses Zookeeper to provide LOCKING functionality. 

Option 2

Optimistic Concurrency using atomic renames with Conflict Resolution

Step 1: Read the latest committed data from the hoodie timeline

Step 2: Perform write

Step 3: Acquire a lock under .hoodie/locks before committing new data. Lock IDs will be in monotonically increasing order. If there are no locks present under .hoodie/locks, the first LockID = 1. We need unique LockIDs for each transaction since we will be storing the commit time in the lock file for later purposes. Since a reader will read that file, we cannot modify the file once created. If there is already an acquired lock present, wait and retry until timeout. Create if not exists file lock1.acquired and write the commit time that acquired the lock into the file. 

Step 4: a) Ensure lock is acquired (file name is lock1.acquired) b) Compare the commit time in lock1.acquired with current commit to ensure the current job is the lock holder 

Before committing, check if the latest committed data to the timeline is the same as in Step 1. If YES, then continue to update the commit time atomically and reflect the new commit time for readers. If NOT, compare the NEW_FILES_BEING_COMMITTED vs FILES_MUTATED_BY_COMMITS since the last committed data read in Step 1. If there are overlapping files, abort the write, unlock by creating lock1.released. If there are no overlapping files, finish the commit on DFS, create lock1.released.


With all the above approaches,  there is a possibility of the job failing mid-way and not being able to release the lock. We need a feature to allow expiring these locks. Here, we will rely on the heartbeat to determine how to expire the lock. Every time a new job wants to acquire the lock, it will do 2 things a) Read the latest heartbeat and ensure the job is still alive b) Check if the lock is available (filename is lock<id>.released). If there has not been a heartbeat within the specified timeframe, the following actions will be performed to release the lock

  1. Release the lock by creating lock<id>.released

After this, the job will proceed to contend and acquire the lock as usual. Note that every any job tries to commit, it has to ensure 2 things a) It goes from inflight → commit b) It still holds the lock it had acquired.

Assumptions

  1. DFS supports atomic renames
  2. DFS supports atomic file create

Option 3

Both the above options have some caveats 

  1. Works only for COPY_ON_WRITE tables
  2. Only works on HDFS which supports atomic renames and does not work for S3 for example. 

There are 2 types consistency models that ensure correctness with concurrency → Strong Consistency (usually a 2PhaseCommit protocol is used). Eventual Consistency presents many options. One option is by exploring a Conflict-Free Replicated Data Type (CRDT) type model. In a CRDT model, data can be updated independently & concurrently without the need for any co-ordination but with a predefined set of rules that are applied during conflict resolution at a later stage. A good example is the Last Writer Wins (LWW) merge function that allows for conflict resolution. 

More details coming soon...


In this RFC, we propose to implement Option 1. In the future, we will add support for Option 2 as well as Option 3 in the future.

An Example of Concurrency control with Priority for clustering/compactions

  1. Clustering is scheduled (files f1,f2,f3 -> g1)
  2. Clustering moves inflight 
  3. Ingestion writes scheduled
  4. Ingestion writes moved to inflight
  5. Ingestion has updates for f1
  6. Clustering finished after taking a lock and checking no other commit has succeeded, put the REPLACE file with mapping f1,f2,f3 -> g1 or put this information in the consolidated metadata
  7. Ingestion tries to finish acquires a lock
  8. Finds clustering has finished in the meantime, finds file level conflict, overlapping f1
  9. If a higher priority was given to the new writer over clustering, one possible implementation is as follows:
    1. Write a new REPLACE metadata to reverse the mapping of f1,f2,f3 -> g1 that was done before.
    2. NOTE that the entire mapping needs to be reversed since records can go from M file groups to N file groups
    3. Need to ensure that the previous version (before clustering) is not cleaned
  10. During the merge of consolidated metadata or the REPLACE timeline, this scenario of "revert" needs to be handled.
  11. Side effect
    1. Redundant clustering operation, previous one’s work is not used, another one needs to be scheduled
    2. Queries will ping pong back and forth between different number of files, layout

Rollout/Adoption Plan

Once the proposed solution is implemented, users will be able to run jobs in parallel by simply launching multiple writers. Upgrading to the latest Hudi release providing this feature will automatically upgrade your timeline layout. 

Failures & Build rollback

TBD

Turning on Scheduling Operations

By default, the scheduling of operations will be enabled for any job for backwards compatibility for current users. The users need to ensure they turn off scheduling and then turn it on only for the dedicated job.

Test Plan

Unit tests and Test suite integration

Dependency/Follow up

With multiple writers, there is no guarantee on ordered completion of commits (for conflict-free multiple writers). Incremental reads depend on monotonically increasing timestamps to ensure no insert/update is missed. This guarantee will be violated with multiple writers. To be able to use multiple writers, we will introduce a new notion to the hoodie timeline, END_COMMIT_TIME along with the START_COMMIT_TIME that is already used for performing commits. Incremental Reads will depend on END_COMMIT_TIME to tail data and also publish them to downstream systems to be able to checkpoint them. 






  • No labels