Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The current design of Hudi is highly dependent on Spark in four modules that depend on Spark, as shown below:

If we expect Hudi to be decoupled from the computing engine, then we have to do some refactoring to improve the current situation. At a very high level, there are roughly two options:

...

The implementation of the index feature is one of the parts in the Flink Job DAG. Flink Stateful API can provide the ability of state management. We can store the index via Flink stateful API. From a low-level abstraction, in unbounded streaming, window is a mechanism that split the unbounded stream into bounded stream. We can use the window in Flink to mapping the micro-batch(RDD) in Spark.

Indexing as a step in the writing process is in the context of computation and is closely related to our computation engine.

HoodieIndex, as a public interface, should be refactored into engine-independent classes. We can generalize Spark-related types. Like this:


Code Block
languagejava
/**
 * Base class for different types of indexes to determine the mapping from uuid.
 *
 * @param <T> The specific type of the {@link HoodieRecordPayload}.
 * @param <DS> The data set or data stream related computation engine.
 * @param <WS> The type of {@link org.apache.hudi.WriteStatus} set.
 * @param <HKS> The type of {@link HoodieKey} set.
 * @param <HKVS> The type of {@link HoodieKey} and value pair set.
 */
public abstract class HoodieGeneralIndex<T extends HoodieRecordPayload, DS, WS, HKS, HKVS> implements Serializable {

  public static <T extends HoodieRecordPayload, DS, WS, HKS, HKVS> HoodieGeneralIndex<T, DS, WS, HKS, HKVS> createIndex(HoodieWriteConfig config) throws HoodieIndexException {
    //...
  }

  /**
   * Checks if the given [Keys] exists in the hoodie table and returns [Key, Option[partitionPath, fileID]] If the
   * optional is empty, then the key is not found.
   */
  public abstract HKVS fetchRecordLocation(HKS hoodieKeys, HoodieTable<T> hoodieTable);

  /**
   * Looks up the index and tags each incoming record with a location of a file that contains the row (if it is actually
   * present).
   */
  public abstract DS tagLocation(DS recordRDD, HoodieTable<T> hoodieTable) throws HoodieIndexException;

  /**
   * Extracts the location of written records, and updates the index.
   * <p>
   * TODO(vc): We may need to propagate the record as well in a WriteStatus class
   */
  public abstract WS updateLocation(WS writeStatusRDD, HoodieTable<T> hoodieTable) throws HoodieIndexException;

}


For the basic index class in Spark context, we can create a HoodieSparkIndex, the definition is:



Rollout/Adoption Plan

None

...