Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Table of Contents

Proposers

Approvers

Status

Current state


Current State

Status
titleUnder Discussion

(tick)

Status
colourYellow
titleIn Progress


Status
colourRed
titleABANDONED


Status
colourGreen
titleCompleted


Status
colourBlue
titleINactive


...

JIRA: here

Released: <Hudi Version>

Abstract

The Hudi library enables to create, manage and modify tables on the DFS using an engine agnostic client library. This allows clients a lot of flexibility to manage tables by embedding this library in their user code and running as they need, based on the schedule that suits them. Since there is no “external server” component to managing these tables, it also allows for significantly less operational burden that comes with maintaining and scaling such solutions. Hudi currently supports ACID guarantees between writer and readers. Currently, Hudi allows only a single client writer to mutate the table on the DFS. This RFC proposes to introduce parallel writer feature to Hudi, allowing non-overlapping file level writes to be done in parallel from multiple different clients. 

Background

Hudi currently supports a single writer model, that allows only a single client writer to mutate the table on the DFS. Using a distributed processing engine like Spark, Hudi allows horizontal scaling features to manage different scales of workloads. Using horizontal scaling, users can provide similar runtimes and latencies of ingestion into Hudi tables even during spiky traffic. 

Over time, we have noticed a lot of activity and requests from users for adding multi-writer capabilities to the Hudi library. There are many examples where this is helpful. From users, we have learnt that although scaling Hudi jobs is a way to handle spiky traffic, there are some fundamental business challenges which necessitate the need for multiple writers to a Hudi table. A classic example cited is backfill. Many times, due to either changes in business requirements or data correction, pipelines require to backfill historical data into tables. As one can imagine, backfilling historical data can be orders of magnitudes higher than steady state ingestion throughput. To keep freshness of data and runtime of jobs the same, one way is to add more processing power to the same job. But there are cases where this option is not viable. First, many times this kind of auto tuning capabilities are not available to clients, thus risking latency of incoming new traffic to the table by allowing backfill data to be processed by the same job. Second, users are flexible about having different reliability guarantees on fresh, new data showing up on the table vs backfilling old data. Users would rather have high reliability and guaranteed latency for steady state new traffic and lower guarantees for backfilled data. Additionally, many times in organizations, these are completely 2 different teams, one responsible for ingesting new data and another to perform backfills/corrections of old data.

Concurrency vs Parallelism


Concurrency

Wikipedia -> “In computer science, concurrency is the ability of different parts or units of a program, algorithm, or problem to be executed out-of-order or in partial order, without affecting the final outcome. This allows for parallel execution of the concurrent units, which can significantly improve overall speed of the execution in multi-processor and multi-core systems. In more technical terms, concurrency refers to the decomposability property of a program, algorithm, or problem into order-independent or partially-ordered components or units.”

In Hudi’s context, if resource = file, if there are multiple different writers attempting to change the same file, they should be able to do so without affecting the final outcome. 

Parallelism

Wikipedia -> “Parallel computing is a type of computation where many calculations or the execution of processes are carried out simultaneously. Large problems can often be divided into smaller ones, which can then be solved at the same time. There are several different forms of parallel computing: bit-level, instruction-level, data, and task parallelism. Parallelism has long been employed in high-performance computing, but has gained broader interest due to the physical constraints preventing frequency scaling.”

...

  1. Unique records across partitions during inserts
    1. If multiple writers are writing to different partitions in parallel, Hudi cannot guarantee uniqueness of keys across partitions, unlike the single writer model. Ensuring unique keys is left up to the users
  2. ReadSerializability across partitions
    1. Since different writers to different partitions can finish at varying times, thus committing data written to partitions in any order, Hudi cannot guarantee read serializability.
  3. Global index support (only HbaseIndex)
    1. Since Global Index (e.g. HbaseIndex) requires a unique key across all partitions in the table, Hudi cannot support Global Index for tables requiring parallel writing due to constraint (1) above. Note, GlobalSimpleIndex works fine since it first partitions based on HoodieKey and then checks the record key per partition. 

Implementation

The good news is that Hudi’s MVCC based reader/writer isolation has already laid out a great foundation for parallel writing. There are few features which prevent parallel writing. The following section describes what those features are and how we plan to alter them to support parallel writing 

Inline rollback

The current Hudi writer job supports automatic inline rollback. The rollback feature is used to clean up any failed writes that may have happened in the immediate past runs of the job. The clean up process could involve cleaning up invalid data files, index information as well as any metadata information that is used to constitute the Hudi table. Hudi does this by checking if the commit under .hoodie directory is a completed commit or an inflight commit. If it is an inflight commit, it will perform the necessary rollback before progressing to ingest the next batch of data. When there are multiple different clients attempting to write to the table, this can result in jobs rolling back each other’s inflight data leading to chaos and corruption. To ensure we support rollback with parallel writing, we need a way to avoid this. Let’s entertain a drastic option to remove inline rollbacks. There are few things that need to be done if this option were to be considered. 

...

Additionally, it can so happen that writer jobs can hang due to several reasons such as long GC pauses or other code bugs. In this situation, due to lack of heartbeats, a cleaner may end up considering this as a failed write and clean up data and metadata. Hence, we also need to implement similar checks on the writer before committing to ensure we abort such writers. 

Ingestion Retry

We CANNOT use the same commit time to retry a failed batch of ingestion any more. Without inline rollbacks, we are unable to remove failed writes from a previous unsuccessful job, hence every time a job tries to ingest data, it has to use a new commit time. 

Bootstrap Hudi table

Users cannot run bootstrap in parallel. They need to finish the bootstrap of a non-hudi table into Hudi first, and then start single or parallel writers. Rollback of failed bootstrap’s are also inline at the moment. To entertain the option proposed above, the inline rollback from bootstraps will also need to be removed and follow the similar clean up model as inline rollback for regular writers. 

Updating Consolidated Metadata

As of  RFC-15 (HUDI-1292) there will only be a single writer to the consolidated metadata but the model works well as long as we have unique instant times writing data concurrently to the consolidated metadata. The reconciliation of the metadata will be done by the metadata reader which will see unique instant times and corresponding metadata.

Incremental reads

With multiple writers, there is no guarantee of ordering of data since different writers can finish writing at different instants of time. Here, we have two options for incremental reads

Option 1

Allow out of order reading (not serializable)

Pros

  • This allows fresh data to be incrementally read
  • The job working on the partition might also down the road incrementally want to consume it ASAP

...

  • Violates read serializability currently supported by Hudi single writer model
  • Requires special a timeline API that users can use to get the last commit time to store as a checkpoint to avoid data loss of previous running writer jobs
  • May require similar changes for post commit hook which allows commit times to be published and downstream jobs to move checkpoints
  • Since we cannot move the checkpoint until all previous writers have finished, the incremental read will reprocess records until we move forward the checkpoint

Option 2

Fence till all previous commits have succeeded before allowing incremental reads

Pros 

  • Guarantees read serializability
  • Works as is, no special timeline API since checkpoints will not move forward until all commits previously have succeeding. This is the model supported for READ_OPTIMIZED VIEW incremental reads for MERGE_ON_READ tables which suffer from the same issue.

...

  • Incremental reads are blocked for all writers and suffer from straggler writer problems. Consider a situation where a backfill job started at time t1. This job could take many hours to finish, say at tn. In the meantime, many writers of fresh data have finished over time t2, t3, tn. With this option, we would have to wait tn-1 amount of time to get incremental reads of all new data.

Option 3

Provide incremental reads in order of writer finish time rather than commit time. 

This has the same Pros and Cons as Option 1 with added API to move checkpoints based on a timeline different from the commit timeline. Essentially, you can imagine an overlay timeline called CompletedCommitOrderedTimeline() which provides time based on completed jobs that allows checkpoints to move forward while preserving commit timeline for Hudi operations. 

Scheduling Operations

Scheduling operations such as cleaning, archiving, compaction, clustering are all done with a single writer model. The assumption is that a single controller can do these operations to ensure we can guarantee ACID between readers and writers, as well as run all operations asynchronously using the MVCC pattern. 

To be able to continue doing this, the proposal is to introduce 1 config that can be turned on in ONLY 1 of the writer jobs that will then allow for scheduling operations to take place, no other parallel writers will perform scheduling. To ensure that this config is set in ONLY 1 job is left upto the user of this feature. 

Unique instant times

Option 1

Need to support providing a test-and-set kind of mechanism to provide unique instant times to parallel writers.

...

  • Requires atomic operations that are not available on all cloud stores

Option 2

 Increasing granularity of commit times to milliseconds

...

  • We can see clashes if 2 jobs started at the same milliseconds (although the probability is very low and this option is used by OLTP systems like C* to resolve latest record)
  • Requires an upgrade/migration of the timeline version that can make it slightly complicated from an operation standpoint

Ensuring Parallel Writer Auditing

Case 1

Time

Job1

Job2

Job3

Action

T1

1.requested

-

-


T2

1.requested

2.requested


1) Check Pending incomplete operations (check the requested file operation type)

2) Store all seen scheduled/running instants (1 & 2) 

...

However, a critical assumption here is that in case there are writes to overlapping partitions or files, the users may see data loss or corruption.

Concurrency Control

Option 1

Optimistic Concurrency using Locking with External Server with Conflict Resolution (Multiple writers can succeed)

Step 1: Read the latest committed data from DFS or external server

...

NOTE: For the first phase of implementation, we will support Hive Metastore as the external server which uses Zookeeper to provide LOCKING functionality. 

Option 2

Optimistic Concurrency using atomic renames with Conflict Resolution

Step 1: Read the latest committed data from the hoodie timeline

...

  1. DFS supports atomic renames
  2. DFS supports atomic file create

Option 3

Both the above options have some caveats 

  1. Works only for COPYMERGE_ON_WRITE READ tables
  2. Only works on HDFS which supports atomic renames and does not work for S3 for example. 

...

In this RFC, we propose to implement Option 1. In the future, we will add support for Option 2 as well as Option 3 in the future.

Rollout/Adoption Plan

Once the proposed solution is implemented, users will be able to run jobs in parallel by simply launching multiple writers. Upgrading to the latest Hudi release providing this feature will automatically upgrade your timeline layout. 

Failures & Build rollback

TBD

Turning on Scheduling Operations

By default, the scheduling of operations will be enabled for any job for backwards compatibility for current users. The users need to ensure they turn off scheduling and then turn it on only for the dedicated job.

Test Plan

Unit tests and Test suite integration

...