You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Status

Current state[One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In practice, many Flink jobs need to read data from multiple sources in sequential order. Change Data Capture (CDC) and machine learning feature backfill are two concrete scenarios of this consumption pattern.

  • Change Data Capture (CDC): Users may have a snapshot stored in HDFS/S3 and the active changelog in either database binlog or Kafka.
  • Machine learning feature backfill: When a new feature is added to the model, that feature needs to be computed from the raw data from a few months ago until present. In most cases, the historical data and real-time data are stored in two different storage systems, e.g. HDFS and Kafka respectively.

In the past, users may have to either run two different Flink jobs or have some hacks in the SourceFunction to address such use cases. However, most users may find it is not as easy it sounds like:

  • Switching among multiple sources is complicated based on current source implementations of different connectors. It is significant to control the specific state of the upstream source before switching and how the downstream source transforms the upstream state as the initial state.
  • Automatically switching for user-defined switchable source that constitutes hybrid source  leads complex implementations. In most cases, users add custom sources, and Flink automatically switches these sources in the specified order of addition. 
  • There is currently no effective mechanism to support smooth source migration between historical and real-time data, for example the source migration between the FileSystem and Kafka source. Smooth migration requires defining rules and time of source switching and what credentials are used to switch, which ensure data integrity and consistency.

To support such scenarios smoothly, the Flink jobs need to first read from HDFS for historical data then switch to Kafka for real-time records. Therefore, this FLIP proposes introducing a “Hybrid Source” API built on top of the new Source API (FLIP-27) to help users with such use cases.

The goal of this FLIP include:

  • Reuse the existing Source connectors built with FLIP-27 without any change.
  • Support an arbitrary combination of sources to form a hybrid source.

Basic Idea

A hybrid source is a source that contains a list of concrete sources. The hybrid source reads from each contained source in the defined order. It switches from source A to the next source B when source A finishes. So that from the users’ perspective, all the sources act as a single source.

In most cases, the hybrid source only contains two sources, but it can contain more sources if needed.

The switch is done in the following way:

  • Source A finishes with an END_POSITION
  • A SourcePositionConverter takes the END_POSITION and initializes source B with a START_POSITION.

To make it work, we have to:

  • Get the END_POSITION of source A.
    • In FLIP-27, the boundedness is an intrinsic property of a Source instance. However, FLIP-27 does not expose the END_POSITION when a source finishes.
  • Initialize source B to its START_POSITION based on source A’s END_POSITION.
    • The START_POSITION of the source B is usually a different type from source A’s END_POSITION.
    • A conversion logic is required here.

In light of the above observation, a SwitchableSplitEnumerator interface is introduced.

SwitchableSplitEnumerator
public interface SwitchableSplitEnumerator<SplitT extends SourceSplit, CheckpointT, StartStateT, EndStateT> extends SplitEnumerator<SplitT, CheckpointT> {

	/**
	 * Set the start state of the SwitchableSplitEnumerator.
	 *
	 * @param startState The start state for the switchable split enumerator.
	 */
	void setStartState(StartStateT startState);

	/**
	 * Get the end state of the SwitchableSplitEnumerator.
	 *
	 * @return The end state for the switchable split enumerator.
	 */
	EndStateT getEndState();
}

The full API changes are listed in the next section.

Public Interface

The public interface brief changes introduced by hybrid source are composed of two parts.

HybridSource
/**
 * The hybrid implementation of {@link Source} which provides switchable functionalities among
 * multiple switchable sources and creation and restore for the {@link HybridSplitEnumeratorBase}.
 * This class allows users to add user-defined switchable sources to switch.
 *
 * @param <T>            The final element type to emit.
 * @param <SwitchStateT> The mutable type of switch state.
 */
public class HybridSource<T, SwitchStateT> implements Source<T, HybridSourceSplit extends SourceSplit, byte[]> {

	HybridSource(SwitchableSource<T, ?, ?, ?, SwitchStateT> initialSource) {...}

	/**
	 * Add the switchable source to HybridSource with the state covert function of switchable split enumerator.
	 *
	 * @param switchableSource user-defined switchable source add to switch.
	 * @param convertFunction  state covert function of switchable split enumerator.
	 * @return hybrid source instance with end state of switchable source.
	 */
	public HybridSource<T, SwitchStateT> addSource(SwitchableSource<T, ?, ?, ?, SwitchStateT> switchableSource, Function convertFunction) {...}
}
SwitchableSource
/**
 * The interface for SwitchableSource. It acts like a factory class that helps construct
 * the {@link SwitchableSplitEnumerator} to switch the sources of  {@link HybridSource}.
 *
 * @param <T>           The type of records produced by the switchable source.
 * @param <SplitT>      The type of splits handled by the switchable source.
 * @param <EnumChkT>    The type of the enumerator checkpoints.
 * @param <StartStateT> The type of start state for the switchable source.
 * @param <EndStateT>   The type of end state for the switchable source.
 */
public interface SwitchableSource<T, SplitT extends SourceSplit, EnumChkT, StartStateT, EndStateT> extends Source<T, SplitT, EnumChkT> {
 
	/**
	 * Creates a new SwitchableSplitEnumerator for this source, starting a new input.
	 *
	 * @param enumContext The {@link SplitEnumeratorContext context} for the split enumerator.
	 * @return A new SwitchableSplitEnumerator.
	 */
	SwitchableSplitEnumerator<SplitT, EnumChkT, StartStateT, EndStateT> createEnumerator(SplitEnumeratorContext<SplitT> enumContext);
 
	/**
	 * Restores a switchable enumerator from a checkpoint.
	 *
	 * @param enumContext The {@link SplitEnumeratorContext context} for the restored split enumerator.
	 * @param checkpoint  The checkpoint to restore the SplitEnumerator from.
	 * @return A SwitchableSplitEnumerator restored from the given checkpoint.
	 */
	SwitchableSplitEnumerator<SplitT, EnumChkT, StartStateT, EndStateT> restoreEnumerator(SplitEnumeratorContext<SplitT> enumContext, EnumChkT checkpoint) throws IOException;
}
SwitchableSplitEnumerator
/**
 * An interface of a switchable split enumerator responsible for the followings:
 * 1. discover the splits for the {@link SourceReader} to read.
 * 2. assign the splits to the source reader.
 * 3. set the start state of the split enumerator.
 * 4. get the end state of the split enumerator.
 *
 * @param <SplitT>      The type of splits handled by the switchable split enumerator.
 * @param <CheckpointT>    The type of the enumerator checkpoints.
 * @param <StartStateT> The type of start state for the switchable split enumerator.
 * @param <EndStateT>   The type of end state for the switchable split enumerator.
 */
public interface SwitchableSplitEnumerator<SplitT extends SourceSplit, CheckpointT, StartStateT, EndStateT> extends SplitEnumerator<SplitT, CheckpointT> {

	/**
	 * Set the start state of the SwitchableSplitEnumerator.
	 *
	 * @param startState The start state for the switchable split enumerator.
	 */
	void setStartState(StartStateT startState);

	/**
	 * Get the end state of the SwitchableSplitEnumerator.
	 *
	 * @return The end state for the switchable split enumerator.
	 */
	EndStateT getEndState();
}

In addition, we will let the FileSource and KafkaSource implement SwitchableSource interface, so they can be used in the HybridSource.

FileSource
/**
 * The SwitchableSource implementation of the file system.
 *
 * @param <T> The type of the events/records produced by this source.
 */
public final class FileSource<T, StartStateT> implements SwitchableSource<T, FileSourceSplit, PendingSplitsCheckpoint, StartStateT, Long> {...}


KafkaSource
/**
 * The SwitchableSource implementation of Kafka.
 *
 * @param <OUT> the output type of the switchable source.
 */
public class KafkaSource<OUT, EndStateT> implements SwitchableSource<OUT, KafkaPartitionSplit, KafkaSourceEnumState, Long, EndStateT> {...}

Proposed Changes

Architecture and design

The way how the hybrid source works is illustrated below.

  1. Create a HybridSource with an initial contained SwitchableSource.
  2. Add SwitchableSources, each with a position conversion function, to the HybridSource.
    1. The position conversion function converts the END_POSITION of the previous SwitchableSource to the START_POSITION of the SwitchableSource being added.
  3. The HybridSource maintains all the added SwitchableSources and there is only one active SwitchableSource at any point.
  4. At runtime, the HybridSource acts like a proxy of the contained active SwitchableSource for most of the time, except in the source switching phase.
    1. HybridSplitEnumertor delegates all the calls to the active SwitchableSplitEnumerator, except for the handling of SourceReaderSwitchedEvent.
    2. HybridSourceReader delegates all the calls to the active SourceReader, except it does not exit when the active SourceReader finishes.
  5. The switching phase procedure is following:
    1. When the active source reader finishes, the HybridSource reader won’t exit. Instead, it first switches the active source reader to the next source reader. Then it sends a SourceReaderSwitchedEvent to the HybridSplitEnumerator.
    2. After receiving SourceReaderSwitchEvents from all the SourceReaders, the HybridSplitEnumerator 
      1. Gets the END_POSITION from the finishing active SplitEnumerator.
      2. Converts the END_POSITION to START_POSITION of the next SplitEnumerator using the corresponding position conversion function.
      3. Creates the next SplitEnumrator and sets its START_POSITION.

FileSource changes

The FileSource implements SwitchableSource and creates a corresponding FileSplitEnumrator which should implement SwitchableSplitEnumrator. FileSplitEnumrator regards the max timestamp of File records as END_POSITION for the file system source.

KafkaSource changes

The KafkaSource implements SwitchableSource and creates a corresponding KafkaSplitEnumrator that implements SwitchableSplitEnumrator. KafkaSplitEnumrator regards the maximum timestamp of the previous source as START_POSITION for the Kafka consumer to seek.

For example, the HybridSource composed of HDFSSource and KafkaSource is as follows:

Implementation Plan

The implementation should proceed in the following tasks:

  1. Validate the HybridSource, SwitchableSource and SwitchableSplitEnumerator interface proposal by implementing popular connectors including FileSource, KafkaSource etc.

Feature Work

The feature plan of HybridSource could be planned as follows:

  1. Implement hybrid SQL connector for the HybridSource to create multiple source tables to specify which ones are added to the hybrid source to switch corresponding sources.

Compatibility, Deprecation, and Migration Plan

Hybrid source implementation is supported by the new source interface, and the switchable source interfaces are inherited from the new source interface. Therefore popular connectors implementations like FileSource, KafkaSource could individually support implementing switchable source interfaces and adding to the hybrid source to switch among multiple switchable sources.

Test Plan

Unit test and integration test for each SwitchableSource and SwitchableSplitEnumerator implementation of connectors like FileSource, KafkaSource etc to validate the smooth switching among the multiple switchable sources.

  • No labels