Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently, in terms of computing engines, Hudi has mainly integrated deeply with Spark. Apache Flink is a popular streaming processing engine. Integrating Hudi with Flink is a valuable work. This will enable Hudi to embrace more computing engines, and the pluggable design will also make its architecture more flexible and open.

Background

The current design of Hudi is highly dependent on Spark in four modules as shown below:

Image Removed

If we expect Hudi to be decoupled from the computing engine, then we have to do some refactoring to improve the current situation. At a very high level, there are roughly two options:

  1. Keep the existing implementation and re-implement all Spark-related capabilities based on Flink (this means we may add four more Flink-related modules);
  2. Refactor the current design so that the parts related to Spark are cohesive into a specific module;

We need to rethink the functional design related to Spark so that it can better follow the pluggable design.

Implementation

The implementation contains two sides.

There are two key modules that need to redesign, they are: hudi-client and hudi-utlilites.

About hudi-client, We can split hudi-client module into two new modules: hudi-writer-common and hudi-spark. hudi-writer-common will have the HoodieIndex, HoodieTable abstract classes along with IOhandle classes, metrics, exceptions. Index implementations themselves now can move to hudi-spark. HoodieWriteClient and the table classes can also put into hudi-spark module. After this refactoring, we can introduce a new hudi-flink module to package flink specific implementation of the index.

Image Removed

HoodieIndex, as a public interface, should be refactored into engine-independent classes. We can generalize Spark-related types. Like this:

Code Block
languagejava
/**
 * Base class for different types of indexes to determine the mapping from uuid.
 *
 * @param <T> The specific type of the {@link HoodieRecordPayload}.
 * @param <DS> The data set or data stream related computation engine.
 * @param <WS> The type of {@link org.apache.hudi.WriteStatus} set.
 * @param <HKS> The type of {@link HoodieKey} set.
 * @param <HKVS> The type of {@link HoodieKey} and value pair set.
 */
public abstract class HoodieGeneralIndex<T extends HoodieRecordPayload, DS, WS, HKS, HKVS> implements Serializable {

  public static <T extends HoodieRecordPayload, DS, WS, HKS, HKVS> HoodieGeneralIndex<T, DS, WS, HKS, HKVS> createIndex(HoodieWriteConfig config) throws HoodieIndexException {
    //...
  }

  /**
   * Checks if the given [Keys] exists in the hoodie table and returns [Key, Option[partitionPath, fileID]] If the
   * optional is empty, then the key is not found.
   */
  public abstract HKVS fetchRecordLocation(HKS hoodieKeys, HoodieTable<T> hoodieTable);

  /**
   * Looks up the index and tags each incoming record with a location of a file that contains the row (if it is actually
   * present).
   */
  public abstract DS tagLocation(DS recordRDD, HoodieTable<T> hoodieTable) throws HoodieIndexException;

  /**
   * Extracts the location of written records, and updates the index.
   * <p>
   * TODO(vc): We may need to propagate the record as well in a WriteStatus class
   */
  public abstract WS updateLocation(WS writeStatusRDD, HoodieTable<T> hoodieTable) throws HoodieIndexException;

}

For the basic index class in Spark context, we can create a HoodieSparkIndex, the definition is:

Code Block
languagejava
public class HoodieSparkIndex<T extends HoodieRecordPayload>
    extends HoodieGeneralIndex<
    T,
    JavaRDD<HoodieRecord<T>>,
    JavaRDD<WriteStatus>,
    JavaRDD<HoodieKey>,
    JavaPairRDD<HoodieKey, Option<Pair<String, String>>>> {

  @Override
  public JavaPairRDD<HoodieKey, Option<Pair<String, String>>> fetchRecordLocation(
      JavaRDD<HoodieKey> hoodieKeys, HoodieTable<T> hoodieTable) {
    //...
  }

  @Override
  public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD,
      HoodieTable<T> hoodieTable) throws HoodieIndexException {
    //...
  }

  @Override
  public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD,
      HoodieTable<T> hoodieTable) throws HoodieIndexException {
    //...
  }

  //...
}

For the basic index class in Flink context, we can create a HoodieFlinkIndex, the definition is:

Code Block
languagejava
public class HoodieFlinkIndex<T extends HoodieRecordPayload>
  extends HoodieGeneralIndex<
    T,
    DataStream<HoodieRecord<T>>,
    DataStream<WriteStatus>,
    DataStream<HoodieKey>,
    DataStream<Tuple2<HoodieKey, Option<Tuple2<String, String>>>>> {

  public HoodieFlinkIndex(HoodieWriteConfig config) {
    super(config);
  }

  @Override
  public DataStream<Tuple2<HoodieKey, Option<Tuple2<String, String>>>> fetchRecordLocation(
      DataStream<HoodieKey> hoodieKeys, HoodieTable<T> hoodieTable) {
    return null;
  }

  @Override
  public DataStream<HoodieRecord<T>> tagLocation(DataStream<HoodieRecord<T>> recordRDD,
      HoodieTable<T> hoodieTable) throws HoodieIndexException {
    return null;
  }

  @Override
  public DataStream<WriteStatus> updateLocation(DataStream<WriteStatus> writeStatusRDD,
      HoodieTable<T> hoodieTable) throws HoodieIndexException {
    return null;
  }

  //...
}

About HoodieTable, we can do the same refactor via Java Generic. For example, we can define a HoodieGenericTable like this:

Code Block
languagejava
/**
 * Abstract implementation of a HoodieTable.
 * @param <T> The specific type of the {@link HoodieRecordPayload}.
 * @param <EC> The specific context type of the computation engine.
 * @param <WSDS> The specific data set type of the {@link WriteStatus}.
 * @param <P> The specific partition type.
 */
public abstract class HoodieGenericTable<T extends HoodieRecordPayload, EC, WSDS, P> implements Serializable {
    //...
}

Then, we will also introduce HoodieSparkTable and HoodieFlinkTable just like HoodieSparkIndex and HoodieFlinkIndex.

About hudi-utlilites, we use some specific Spark data sources there. So we can either split the core deltastreamer logic as a hudi-deltastreamer-core or hudi-utilities-core and have the Sources themselves live in a separate module as hudi-utilities-spark, hudi-utilities-flink:

Image Removed

After step 1, we have decoupled Hudi and spark. Now, we need to implement some functions just like Spark did, e.g. Index.

The implementation of the index feature is one of the parts in the Flink Job DAG. Flink Stateful API can provide the ability of state management. We can store the index via Flink stateful API. From a low-level abstraction, in unbounded streaming, window is a mechanism that split the unbounded stream into bounded stream. We can use the window in Flink to mapping the micro-batch(RDD) in Spark.

Indexing is one of the steps in the writing process which exists in the context of computation and is closely related to our computation engine. Therefore, the implementation of the existing indexes also needs to give corresponding implementations for different computation engines. The whole class diagram is shown below:

Motivation

Currently, Delta Streamer can support “streaming” data ingestion. The "streaming" here is actually a continuous mode, which is a continuous batch processing cycle. The advantage of this feature is that we can make sure about each small-batch, then the small batches are processed continuously at small time intervals to form a stream processing. This design is a bit like the Spark RDD model. But in essence, this is still not regarded as a pure stream ingestion. As streaming computing becomes more and more popular, people are more and more eager to expect data to be processed with lower latency.

When we consider using Flink as Hudi's ingestion framework, the design concepts of Spark RDD and Flink DataStream API are obviously different. Therefore, at some points, we can not fully copy the existing Delta Streamer design.

Design

Next, we will elaborate on the design of Flink-based streaming ingest and writing to Hudi.

It needs to be clear that in Hudi’s concept we need to ensure that a batch of records must be atomically written in a table, which also must be guaranteed when implemented via Flink. So, this involves how we define batches in Flink (obviously, considering the performance and the problem of small files that HDFS has been criticized for a long time, we still hope to use the batch-write mechanism). 

In Flink, for a single key, we can use the window as a batch to cut out the stream, which is well known. In a sense, checkpoints are also divided into batches by triggering via time interval, but this is a more obscure concept relative to windows, because checkpoints emphasize "points", and the "snapshot" corresponding to the "point". So, in the face of two mechanisms for cutting batches on the stream, how should we choose? This depends on other considerations.

A major feature of Hudi is that it provides the semantics of typical database-like transactions on HDFS. This maps the related APIs it provides, such as: commit/rollback. In order to support rollback, we need to save the starting offset corresponding to our batch. This seems to naturally have a mapping relationship with Flink's checkpoint mechanism, because Flink's checkpoint is also for rollback during restore. Another problem is that in Hudi's Spark write path, it is a distributed "batch" rather than a single-partition batch. So, in a sense, we think the concept of applying checkpoints is more appropriate here.

Looking back, it is guaranteed that the characteristics of "atomic" writing are provided by the commit operation. But to ensure performance, our write operations are distributed in parallel. Therefore, in a distributed scenario, we need a coordination mechanism to commit after writing. And this can be achieved by depending on the mechanism of checkpoint snapshot execution and checkpoint completion notification. When we implement it based on windows, although we can also use the checkpoint capability, it will make the whole process more complicated, and we cannot take the major advantage of windows (for example, aggregation).


The above illustrates some of the design considerations in the process of integrating Flink with Hudi. Next, let's describe our design. The DAG of the job can be represented by the following diagram:

Image Added

From the above picture, we can see that it is a pipeline without branches. Let's introduce the purpose of each operator:

  • source: used to connect Kafka's message stream;
  • instant generate operator (customized): used to generate a globally unique instant (each batch of hudi needs to correspond to an instant), its parallelism is 1;
  • keyBy: partition the data with partitionPath as the key to avoid concurrent write operations to the same partition;
  • keyed Process: It carries the main logic of the write path, including index search and file writing operations;
  • sink: a global commit sink, its parallelism is also 1, implement CheckpointListener;


Next, we briefly introduce the mechanism and general principles of checkpoints. 

Image Added

Flink's checkpoints are triggered periodically. The CheckpointCoordinator located in JobMaster acts as a coordinator. It triggers all source tasks to drive the actual execution of a checkpoint. Then the source tasks will broadcast a barrier event to all input channels of the downstream. These barrier events serve as markers in the data stream that drive checkpoints (carrying checkpoint numbers) downstream to the sink. After each downstream task collects the correct number of barriers, it will also execute the snapshotState method. After broadcasting the barriers, the source task will execute the snapshotState method on another thread. 

After each task executes this method, it will send an ack message to JobMaster. Next, we explain the synchronization and coordination mechanism that combines checkpoints. When JobMaster receives all ack messages, it will confirm the completion of the current round of checkpoints, and call back the notifyCheckpointComplete method for all tasks that implement the CheckpointListener interface. See below image:

Image Added

Through the above understanding of the Flink checkpoint mechanism, we can use the collaboration mechanism between snapshotState and notifyCheckpointComplete to implement the process of writing and committing through Flink. We will implement a UDF of type KeyedProcessFunction and make the UDF inherit the CheckpointedFunction interface to customize the snapshotState method and complete the write operation in this method. When KeyedProcessFunction receives a piece of data, it will call the process method once. In this method, we will complete the index search and tagLocation process.


As mentioned above, the current Flink-based write implementation is very different from the existing Delta Streamer's Spark RDD-based write implementation. It is real streaming processing, not a circulating small batch processing. Therefore, in addition to the different ways of defining "batch" here, we also face the problem of how to generate instants for one batch. For the implementation of Delta Streamer, because the loop is sequential, so it can generate a unique instant, but in real streaming, we must find a way to generate a globally unique instant for each checkpoint and pass it to the downstream. Here we explicitly split it into two steps: 

  1. how to generate a globally unique instant; 
  2. how to pass it to the downstream;


Firstly, we should solve the first problem, how to generate a globally unique instant? Here we can only introduce an operator with a parallelism of 1 in the pipeline, which generates an instant, so that there will be no consistency problems caused by concurrency. Since the timing of our offset saving, writing operations, and commits are all tied to Flink's checkpoint mechanism, then the timing of our instant generation should also be done. For this, we need to extend Flink's operator and rewrite its prepareSnapshotPreBarrier method. This method will be executed firstly, Then the barrier is sent to the downstream, At last, snapshotState method is executed. This ensures that when the snapshotState method is executed downstream, the upstream instant must have been generated.

Next, we solve the second problem, how to pass the instant to the downstream. Here we have considered to carry the instant in the data, but this will take advantage of some features of Flink itself, such as BroadcastState(mainly connect two streams, one stream is the data stream ingests data from kafka, the other is the instant generation stream which generates instant time, so the downstream may receive like <HoodieRecord, Tuple2<InstantTime, InstantTime>> record), considering the complexity and other engines for scalability, we decided to use an external mechanism to achieve instant delivery. Here, we have conceived two options:

  • Extend TimelineService to provide an instant generation service;
  • With the help of external storage (such as HDFS);


For the first option, as we all know, Hudi currently provides a timeline web service for external timeline query services, but currently does not provide instant generation services. If it can provide this generation work, it can be compared to a simple "global clock acquisition" service, and the synchronization mechanism is handled by the client itself.

Image Added

If we choose this option, we need to start the service first before start the flink/spark job, the writing process will have a strong dependence on TimelineService, then we must ensure the reliability of TimelineService, considering hudi is just a lib, the option will make hudi a bit heavy currently. 

For the second option, the user specifies a storage path when starting the writing job, the instant generate operator writes to this path, and the downstream goes to change the path to get it. The disadvantage of this solution is the introduction of external storage, and Hudi itself depends on HDFS or S3. 

Image Added

If we store the instant under Hudi's metadata path, then in a sense, it does not depend on external storage, the Hudi write path and other configuration would be passed to all tasks in open method.

The above are some of the problems and thoughts we encountered when trying to implement the first version. Of course, the index storage here still uses HBase. When we consider implementation based on BloomFilter, we may have new considerations.

...


draw.io Diagram
bordertrue
diagramNameexecution-plan-rfc13
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth

...

171
revision

...

1

Rollout/Adoption Plan

None

...