Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Div
classhome-banner

Table of Contents

Proposers

  • @<proposer1 JIRA username>
  • @<proposer2 JIRA username>
  • ...

Approvers

Approvers

  • Vinoth Chandar @<approver1 JIRA username> [APPROVED/REQUESTED_INFO/REJECTED]@<approver2
  • JIRA username> Suneel Marthi  [APPROVED/REQUESTED_INFO/REJECTED]
  • ...

Status

Current state: [One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: here

JIRA: here

Released: <Hudi Version>

Abstract

Currently, in terms of computing engines, Hudi has mainly integrated deeply with Spark. Apache Flink is a popular streaming processing engine. Integrating Hudi with Flink is a valuable work. This will enable Hudi to embrace more computing engines, and the pluggable design will also make its architecture more flexible and open.

Background

The current design of Hudi is highly dependent on Spark in four modules that depend on Spark, as shown below:

Implementation

<Describe the new thing you want to do in appropriate detail, how it fits into the project architecture. Provide a detailed description of how you intend to implement this feature.This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.>

Rollout/Adoption Plan

  • <What impact (if any) will there be on existing users?>
  • <If we are changing behavior how will we phase out the older behavior?>
  • <If we need special migration tools, describe them here.>
  • <When will we remove the existing behavior?>

Test Plan

Motivation

Currently, Delta Streamer can support “streaming” data ingestion. The "streaming" here is actually a continuous mode, which is a continuous batch processing cycle. The advantage of this feature is that we can make sure about each small-batch, then the small batches are processed continuously at small time intervals to form a stream processing. This design is a bit like the Spark RDD model. But in essence, this is still not regarded as a pure stream ingestion. As streaming computing becomes more and more popular, people are more and more eager to expect data to be processed with lower latency.

When we consider using Flink as Hudi's ingestion framework, the design concepts of Spark RDD and Flink DataStream API are obviously different. Therefore, at some points, we can not fully copy the existing Delta Streamer design.

Design

Next, we will elaborate on the design of Flink-based streaming ingest and writing to Hudi.

It needs to be clear that in Hudi’s concept we need to ensure that a batch of records must be atomically written in a table, which also must be guaranteed when implemented via Flink. So, this involves how we define batches in Flink (obviously, considering the performance and the problem of small files that HDFS has been criticized for a long time, we still hope to use the batch-write mechanism). 

In Flink, for a single key, we can use the window as a batch to cut out the stream, which is well known. In a sense, checkpoints are also divided into batches by triggering via time interval, but this is a more obscure concept relative to windows because checkpoints emphasize "points", and the "snapshot" corresponding to the "point". So, in the face of two mechanisms for cutting batches on the stream, how should we choose?

Here we choose checkpoint, reasons will be given later.

A major feature of Hudi is that it provides the semantics of typical database-like transactions on HDFS. This maps the related APIs it provides, such as: commit/rollback. In order to support rollback, we need to save the starting offset corresponding to our batch. This seems to naturally have a mapping relationship with Flink's checkpoint mechanism, because Flink's checkpoint is also for rollback during restore. Another problem is that in Hudi's Spark write path, it is a distributed "batch" rather than a single-partition batch. So, in a sense, we think the concept of applying checkpoints is more appropriate here.

Looking back, it is guaranteed that the characteristics of "atomic" writing are provided by the commit operation. But to ensure performance, our write operations are distributed in parallel. Therefore, in a distributed scenario, we need a coordination mechanism to commit after writing.

This can be achieved by count the number of results from subtasks(e.g. 12 parallelisms, 12 results). when we implement it based on windows, if some subtasks have no input data, it will not emit results to sink. But Relying on Checkpoint, We can introduce a WriteProcessOperator to mock results to send to the sink. In this way, The sink will receive exactly the num of parallelism results from WriteProcessOperator, regardless of data skew.


The above illustrates some of the design considerations in the process of integrating Flink with Hudi. Next, let's describe our design. The DAG of the job can be represented by the following diagram:

Image Added

From the above picture, we can see that it is a pipeline without branches. Let's introduce the purpose of each operator:

  • source: used to connect Kafka's message stream. Kafka data will be transformed into HoodieRecord here;
  • instant generate operator (customized): used to generate a globally unique instant (each batch of hudi needs to correspond to an instant), its parallelism is 1. Before emitting a new instant, it will check the state of last instant, if it exists and not completed, it will wait until timeout.
  • keyBy: partition the data with partitionPath as the key to avoid concurrent write operations to the same partition;
  • keyed Process: It carries the main logic of the write path, including index search and file writing operations. If some subtasks have no data flow in, they will send an empty result to the sink;
  • sink: a global commit sink, its parallelism is also 1. This wink will count the num of results it received, it will not commit until the num equal the parallelism.


As mentioned above, the current Flink-based write implementation is very different from the existing Delta Streamer's Spark RDD-based write implementation. It is real streaming processing, not a circulating small batch processing. Therefore, in addition to the different ways of defining "batch" here, we also face the problem of how to generate instants for one batch. For the implementation of Delta Streamer, because the loop is sequential, so it can generate a unique instant, but in real streaming, we must find a way to generate a globally unique instant for each checkpoint.

Here we can introduce an operator with a parallelism of 1 in the pipeline, which will generate an instant when the last one is completed(if not we can should wait, make sure will be only one instant inflight). so that there will be no consistency problems caused by concurrency. Since the timing of our offset saving, writing operations, and commits are all tied to Flink's checkpoint mechanism, then the timing of our instant generation should also be done. For this, we need to extend Flink's operator and rewrite its prepareSnapshotPreBarrier method. This method will be executed firstly, Then the barrier is sent to the downstream, At last, snapshotState method is executed. This ensures that when the snapshotState method is executed downstream, the upstream instant must have been generated.

draw.io Diagram
bordertrue
diagramNameexecution-plan-rfc13
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth171
revision1

Rollout/Adoption Plan

None

Test Plan

TBD.<Describe in few sentences how the RFC will be tested. How will we know that the implementation works as expected? How will we know nothing broke?>