Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-22668

...

Release1.14


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  • Reuse the existing Source connectors built with FLIP-27 without any change.
  • Support an arbitrary combination of sources to form a hybrid source.

Basic Idea

This FLIP proposes to support switching sources with either predetermined start positions or with position conversion at switch time. The former mode is very simple - sources are configured upfront with their start/end positions and wrapped into HybridSource. No special support is required on existing sources.

With position conversion at switch time the end position of the current source is converted into the start position of the next source. This requires support in the split enumerator of the current source to provide the end position, support in the next source to set the start position (like the start timestamp in KafkaSource) and a user supplied function that converts the end position into the start position.

...

A hybrid source is a source that contains a list of concrete sources. The hybrid source reads from each contained source in the defined order. It switches from source A to the next source B when source A finishes. So that from the users’ perspective, all the sources act as a single source.

In most cases, the hybrid source only contains two sources, but it can contain more sources if needed.

This FLIP proposes to support switching sources with either predetermined start positions or with position conversion at switch time. The former mode is very simple - sources are configured upfront with their start/end positions and wrapped into HybridSource. No special support is required on existing sources.

With position conversion at switch time the end position of the current source is converted into the start position of the next source. This requires support in the split enumerator of the current source to provide the end position, support in the next source to set the start position (like the start timestamp in KafkaSource) and a user supplied function that converts the end position into the start position.

  1. The HybridSource enumerator manages the process of switching between two sources.
  2. A user provided

...

  1. implementation of SourceFactory is used to create the next Source when the previous Source finishes reading.
  2. The SourceFactory is expected to do the following:
    a. Get the end_position from the SplitEnumerator of the previous finished Source. (This may require modification of existing source like FileSource to expose the end position.)
    b. Translate that end_position to the start_position of the next source.
    c. Construct and setup the next Source.

A hybrid source is a source that contains a list of concrete sources. The hybrid source reads from each contained source in the defined order. It switches from source A to the next source B when source A finishes. So that from the users’ perspective, all the sources act as a single source.

In most cases, the hybrid source only contains two sources, but it can contain more sources if needed.

The switch is done in the following way:

  • Source A finishes with an END_POSITION
  • A SourcePositionConverter takes the END_POSITION and initializes source B with a START_POSITION.

The switch is done in the following way:

  • Source A finishes with an END_POSITION
  • A SourcePositionConverter takes the END_POSITION and initializes source B with a START_POSITION.

To make it To make it work, we have to:

  • Get the END_POSITION of source A.
    • In FLIP-27, the boundedness is an intrinsic property of a Source instance. However, FLIP-27 does not expose the END_POSITION when a source finishes.
  • Initialize source B to its START_POSITION based on source A’s END_POSITION.
    • The START_POSITION of the source B is usually a different type from source A’s END_POSITION.
    • A conversion logic is required here.

...

The baseline implementation of hybrid source switches underlying sources based on configured source chain built from HybridSourceBuilder.  HybridSourceBuilder adds the source with deferred instantiation based on previous enumerator to build the source chain through the SourceFactory for underlying sources of hybrid source.  SourceFactory permits building of a source at graph construction time or deferred at switch time and provides the ability to set a start position in the way allowed by a specific source. When the current enumerator has finished, SourceFactory creates the next source from the previous enumerator before the next enumerator is created and only required for dynamic position transfer at time of switching. Thus the end state of split enumerator is used to set the next source's start start position. 

Code Block
languagejava
titleHybridSource
languagejava
titleHybridSource
/**
 * Hybrid source that switches underlying sources based on configured source chain.
 *
 * <p>A simple example with FileSource and KafkaSource with fixed Kafka start position:
 *
 * <pre>{@code
 * FileSource<String> fileSource =
 *   FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build();
 * KafkaSource<String> kafkaSource =/**
 * Hybrid source that switches underlying sources based on configured source chainKafkaSource.<String>builder()
 * *                   .setBootstrapServers("localhost:9092")
 * <p>A simple example with FileSource and KafkaSource with fixed Kafka start position:
 *
 * <pre>{@code
 * FileSource<String> fileSource = .setGroupId("MyGroup")
 *   FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build();
 * KafkaSource<String> kafkaSource =
 *           KafkaSource.<String>builder(.setTopics(Arrays.asList("quickstart-events"))
 *                   .setBootstrapServers("localhost:9092")setDeserializer(
 *                     .setGroupId("MyGroup"      KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
 *                   .setTopicssetStartingOffsets(ArraysOffsetsInitializer.asListearliest("quickstart-events"))
 *                   .setDeserializer(build();
 * HybridSource<String> hybridSource =
 *           HybridSource.builder(fileSource)
 *                    KafkaRecordDeserializer.valueOnly(StringDeserializer.class)addSource(kafkaSource)
 *                   .setStartingOffsetsbuild(OffsetsInitializer.earliest());
 * }</pre>
 *
 * <p>A more complex example with Kafka start position derived from previous source:
 *
   .build();* <pre>{@code
 * HybridSource<String> hybridSource ==
 *     HybridSource.<String, StaticFileSplitEnumerator>builder(fileSource)
 *           HybridSource.builderaddSource(fileSource)
 *             switchContext -> {
 *   .addSource(kafkaSource)
 *           StaticFileSplitEnumerator previousEnumerator =
 *     .build();
 * }</pre>
 *
 * <p>A more complex example with Kafka start position derived from previous source: switchContext.getPreviousEnumerator();
 *
 * <pre>{@code
 * HybridSource<String> hybridSource =
 *     HybridSource.<String, StaticFileSplitEnumerator>builder(fileSource)
 * // how to get timestamp depends on specific .addSource(enumerator
 *               long switchContexttimestamp -> {= previousEnumerator.getEndTimestamp();
 *               StaticFileSplitEnumeratorOffsetsInitializer previousEnumeratoroffsets =
 *                   switchContextOffsetsInitializer.getPreviousEnumeratortimestamp(timestamp);
 *               // how to get timestamp depends on specific enumeratorKafkaSource<String> kafkaSource =
 *               long  timestamp = previousEnumeratorKafkaSource.getEndTimestamp<String>builder();
 *               OffsetsInitializer offsets =        .setBootstrapServers("localhost:9092")
 *                      OffsetsInitializer .timestampsetGroupId(timestamp"MyGroup");
 *                        KafkaSource<String> kafkaSource =
 *.setTopics(Arrays.asList("quickstart-events"))
 *                       KafkaSource.<String>buildersetDeserializer()
 *                           KafkaRecordDeserializer.setBootstrapServers("localhost:9092"valueOnly(StringDeserializer.class))
 *                       .setGroupIdsetStartingOffsets("MyGroup"offsets)
 *                       .setTopics(Arrays.asList("quickstart-events"))build();
 *                       .setDeserializer(return kafkaSource;
 *             },
 *             KafkaRecordDeserializer.valueOnly(StringDeserializer.class))Boundedness.CONTINUOUS_UNBOUNDED)
 *         .build();
 * }</pre>
 */
@PublicEvolving
public class HybridSource<T> implements Source<T, HybridSourceSplit,      .setStartingOffsets(offsets)HybridSourceEnumeratorState> {...}


Code Block
languagejava
titleSourceFactory
/**
 * Factory for underlying sources of {@link HybridSource}.
 *
 * <p>This factory permits building of a source at graph construction time or deferred   .build();at switch
 * time. Provides the ability to set a start position in any way a specific returnsource kafkaSource;allows.
 * Future convenience could be built on top of it, for example a default implementation },
 *             Boundedness.CONTINUOUS_UNBOUNDED)
 *         .build();
 * }</pre>
 */
@PublicEvolving
public class HybridSource<T> implements Source<T, HybridSourceSplit, HybridSourceEnumeratorState> {...}
Code Block
languagejava
titleSourceFactory
/**
 * Factory for underlying sources of {@link HybridSource}that
 * recognizes optional interfaces to transfer position in a universal format.
 *
 * <p>Called when the current enumerator has finished. The previous source's final state can
 * thus be used to construct the next source, as required for dynamic position transfer at time
 * of switching.
 *
 * <p>If <p>Thisstart factoryposition permitsis buildingknown ofat ajob sourcesubmission attime, graphthe constructionsource timecan orbe deferredconstructed atin switchthe
 * time.entry Providespoint theand abilitysimply towrapped setinto athe startfactory, positionproviding inthe anybenefit wayof avalidation specificduring
 source* allowssubmission.
 * Future convenience could be built on top of it, for example a default implementation that
 * recognizes optional interfaces to transfer position in a universal format.
 *
 * <p>Called when the current enumerator has finished. The previous source's final state can
 * thus be used to construct the next source, as required for dynamic position transfer at time
 * of switching.
 *
 * <p>If start position is known at job submission time, the source can be constructed in the
 * entry point and simply wrapped into the factory, providing the benefit of validation during
 * submission.
 */
@FunctionalInterface
public interface SourceFactory<
                T, SourceT extends Source<T, ?, ?>, FromEnumT extends SplitEnumerator>
        extends Serializable {
    SourceT create(SourceSwitchContext<FromEnumT> context/
@FunctionalInterface
public interface SourceFactory<
                T, SourceT extends Source<T, ?, ?>, FromEnumT extends SplitEnumerator>
        extends Serializable {
    SourceT create(SourceSwitchContext<FromEnumT> context);
}


Code Block
languagejava
titleSourceSwitchContext
/**
 * Context provided to source factory.
 *
 * <p>To derive a start position at switch time, the source can be initialized from context of
 * the previous enumerator. A specific enumerator implementation may carry state such as an end
 * timestamp, that can be used to derive the start position of the next source.
 *
 * <p>Currently only the previous enumerator is exposed. The context interface allows for
 * backward compatible extension, i.e. additional information about the previous source can be
 * supplied in the future.
 */
public interface SourceSwitchContext<EnumT> {
    EnumT getPreviousEnumerator();
}


Code Block
languagejava
titleHybridSourceBuilder
/** Builder for HybridSource. */
public static class HybridSourceBuilder<T, EnumT extends SplitEnumerator>
        implements Serializable {
    private final List<SourceListEntry> sources;

    public HybridSourceBuilder() {
        sources = new ArrayList<>();
    }

    /** Add pre-configured source (without switch time modification). */
    public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, ?, ?>>
            HybridSourceBuilder<T, ToEnumT> addSource(NextSourceT source) {
        return addSource(new PassthroughSourceFactory<>(source), source.getBoundedness());
    }

    /** Add source with deferred instantiation based on previous enumerator. */
    public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, ?, ?>>
            HybridSourceBuilder<T, ToEnumT> addSource(
                    SourceFactory<T, NextSourceT, EnumT> sourceFactory,
                    Boundedness boundedness) {
        if (!sources.isEmpty()) {
            Preconditions.checkArgument(
                    Boundedness.BOUNDED.equals(sources.get(sources.size() - 1).boundedness),
                    "All sources except the final source need to be bounded.");
        }
        ClosureCleaner.clean(
                sourceFactory, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
        sources.add(SourceListEntry.of(sourceFactory, boundedness));
        return (HybridSourceBuilder) this;
    }

    /** Build the source. */
    public HybridSource<T> build() {
        return new HybridSource(sources);
    }
}

...