You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 15 Next »

Proposers

Approvers

Status

Current state: ["Under Discussion"]

Discussion thread: here

JIRA: here

Released: <Hudi Version>

Abstract

Currently, in terms of computing engines, Hudi has mainly integrated deeply with Spark. Apache Flink is a popular streaming processing engine. Integrating Hudi with Flink is a valuable work. This will enable Hudi to embrace more computing engines, and the pluggable design will also make its architecture more flexible and open.

Background

The current design of Hudi is highly dependent on Spark in four modules as shown below:

If we expect Hudi to be decoupled from the computing engine, then we have to do some refactoring to improve the current situation. At a very high level, there are roughly two options:

  1. Keep the existing implementation and re-implement all Spark-related capabilities based on Flink (this means we may add four more Flink-related modules);
  2. Refactor the current design so that the parts related to Spark are cohesive into a specific module;


We need to rethink the functional design related to Spark so that it can better follow the pluggable design.

Implementation

The implementation contains two sides.

There are two key modules that need to redesign, they are: hudi-client and hudi-utlilites.

About hudi-client, We can split hudi-client module into two new modules: hudi-writer-common and hudi-spark. hudi-writer-common will have the HoodieIndex, HoodieTable abstract classes along with IOhandle classes, metrics, exceptions. Index implementations themselves now can move to hudi-spark. HoodieWriteClient and the table classes can also put into hudi-spark module. After this refactoring, we can introduce a new hudi-flink module to package flink specific implementation of the index.


HoodieIndex, as a public interface, should be refactored into engine-independent classes. We can generalize Spark-related types. Like this:

/**
 * Base class for different types of indexes to determine the mapping from uuid.
 *
 * @param <T> The specific type of the {@link HoodieRecordPayload}.
 * @param <DS> The data set or data stream related computation engine.
 * @param <WS> The type of {@link org.apache.hudi.WriteStatus} set.
 * @param <HKS> The type of {@link HoodieKey} set.
 * @param <HKVS> The type of {@link HoodieKey} and value pair set.
 */
public abstract class HoodieGeneralIndex<T extends HoodieRecordPayload, DS, WS, HKS, HKVS> implements Serializable {

  public static <T extends HoodieRecordPayload, DS, WS, HKS, HKVS> HoodieGeneralIndex<T, DS, WS, HKS, HKVS> createIndex(HoodieWriteConfig config) throws HoodieIndexException {
    //...
  }

  /**
   * Checks if the given [Keys] exists in the hoodie table and returns [Key, Option[partitionPath, fileID]] If the
   * optional is empty, then the key is not found.
   */
  public abstract HKVS fetchRecordLocation(HKS hoodieKeys, HoodieTable<T> hoodieTable);

  /**
   * Looks up the index and tags each incoming record with a location of a file that contains the row (if it is actually
   * present).
   */
  public abstract DS tagLocation(DS recordRDD, HoodieTable<T> hoodieTable) throws HoodieIndexException;

  /**
   * Extracts the location of written records, and updates the index.
   * <p>
   * TODO(vc): We may need to propagate the record as well in a WriteStatus class
   */
  public abstract WS updateLocation(WS writeStatusRDD, HoodieTable<T> hoodieTable) throws HoodieIndexException;

}


For the basic index class in Spark context, we can create a HoodieSparkIndex, the definition is:

public class HoodieSparkIndex<T extends HoodieRecordPayload>
    extends HoodieGeneralIndex<
    T,
    JavaRDD<HoodieRecord<T>>,
    JavaRDD<WriteStatus>,
    JavaRDD<HoodieKey>,
    JavaPairRDD<HoodieKey, Option<Pair<String, String>>>> {

  @Override
  public JavaPairRDD<HoodieKey, Option<Pair<String, String>>> fetchRecordLocation(
      JavaRDD<HoodieKey> hoodieKeys, HoodieTable<T> hoodieTable) {
    //...
  }

  @Override
  public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD,
      HoodieTable<T> hoodieTable) throws HoodieIndexException {
    //...
  }

  @Override
  public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD,
      HoodieTable<T> hoodieTable) throws HoodieIndexException {
    //...
  }

  //...
}


For the basic index class in Flink context, we can create a HoodieFlinkIndex, the definition is:

public class HoodieFlinkIndex<T extends HoodieRecordPayload>
  extends HoodieGeneralIndex<
    T,
    DataStream<HoodieRecord<T>>,
    DataStream<WriteStatus>,
    DataStream<HoodieKey>,
    DataStream<Tuple2<HoodieKey, Option<Tuple2<String, String>>>>> {

  public HoodieFlinkIndex(HoodieWriteConfig config) {
    super(config);
  }

  @Override
  public DataStream<Tuple2<HoodieKey, Option<Tuple2<String, String>>>> fetchRecordLocation(
      DataStream<HoodieKey> hoodieKeys, HoodieTable<T> hoodieTable) {
    return null;
  }

  @Override
  public DataStream<HoodieRecord<T>> tagLocation(DataStream<HoodieRecord<T>> recordRDD,
      HoodieTable<T> hoodieTable) throws HoodieIndexException {
    return null;
  }

  @Override
  public DataStream<WriteStatus> updateLocation(DataStream<WriteStatus> writeStatusRDD,
      HoodieTable<T> hoodieTable) throws HoodieIndexException {
    return null;
  }

  //...
}


About HoodieTable, we can do the same refactor via Java Generic. For example, we can define a HoodieGenericTable like this:


/**
 * Abstract implementation of a HoodieTable.
 * @param <T> The specific type of the {@link HoodieRecordPayload}.
 * @param <EC> The specific context type of the computation engine.
 * @param <WSDS> The specific data set type of the {@link WriteStatus}.
 * @param <P> The specific partition type.
 */
public abstract class HoodieGenericTable<T extends HoodieRecordPayload, EC, WSDS, P> implements Serializable {
    //...
}

Then, we will also introduce HoodieSparkTable and HoodieFlinkTable just like HoodieSparkIndex and HoodieFlinkIndex.


About hudi-utlilites, we use some specific Spark data sources there. So we can either split the core deltastreamer logic as a hudi-deltastreamer-core or hudi-utilities-core and have the Sources themselves live in a separate module as hudi-utilities-spark, hudi-utilities-flink:

After step 1, we have decoupled Hudi and spark. Now, we need to implement some functions just like Spark did, e.g. Index.

The implementation of the index feature is one of the parts in the Flink Job DAG. Flink Stateful API can provide the ability of state management. We can store the index via Flink stateful API. From a low-level abstraction, in unbounded streaming, window is a mechanism that split the unbounded stream into bounded stream. We can use the window in Flink to mapping the micro-batch(RDD) in Spark.

Indexing is one of the steps in the writing process which exists in the context of computation and is closely related to our computation engine. Therefore, the implementation of the existing indexes also needs to give corresponding implementations for different computation engines. The whole class diagram is shown below:


Rollout/Adoption Plan

None

Test Plan

TBD.

  • No labels