Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-22668

...

Release1.14


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
titleSourceFactory
/**
 * Factory for underlying sources of {@link HybridSource}.
 *
 * <p>This factory permits building of a source at graph construction time or deferred at switch
 * time. Provides the ability to set a start position in any way a specific source allows.
 * Future convenience could be built on top of it, for example a default implementation that
 * recognizes optional interfaces to transfer position in a universal format.
 *
 * <p>Called when the current enumerator has finished. The previous source's final state can
 * thus be used to construct the next source, as required for dynamic position transfer at time
 * of switching.
 *
 * <p>If start position is known at job submission time, the source can be constructed in the
 * entry point and simply wrapped into the factory, providing the benefit of validation during
 * submission.
 */
@FunctionalInterface
public interface SourceFactory<
                T, SourceT extends Source<T, ?, ?>, FromEnumT extends SplitEnumerator>
        extends Serializable {
    SourceT create(SourceSwitchContext<FromEnumT> context);
}


Code Block
languagejava
titleSourceSwitchContext
/**
 * Context provided to source factory.
 *
 * <p>To derive a start position at switch time, the source can be initialized from context of
 * the previous enumerator. A specific enumerator implementation may carry state such as an end
 * timestamp, that can be used to derive the start position of the next source.
 *
 * <p>Currently only the previous enumerator is exposed. The context interface allows for
 * backward compatible extension, i.e. additional information about the previous source can be
 * supplied in the future.
 */
public interface SourceSwitchContext<EnumT> {
    EnumT getPreviousEnumerator();
}


Code Block
languagejava
titleHybridSourceBuilder
/** Builder for HybridSource. */
public static class HybridSourceBuilder<T, EnumT extends SplitEnumerator>
        implements Serializable {
    private final List<SourceListEntry> sources;

    public HybridSourceBuilder() {
        sources = new ArrayList<>();
    }

    /** Add pre-configured source (without switch time modification). */
    public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, ?, ?>>
            HybridSourceBuilder<T, ToEnumT> addSource(NextSourceT source) {
        return addSource(new PassthroughSourceFactory<>(source), source.getBoundedness());
    }

    /** Add source with deferred instantiation based on previous enumerator. */
    public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, ?, ?>>
            HybridSourceBuilder<T, ToEnumT> addSource(
                    SourceFactory<T, NextSourceT, EnumT> sourceFactory,
                    Boundedness boundedness) {
        if (!sources.isEmpty()) {
            Preconditions.checkArgument(
                    Boundedness.BOUNDED.equals(sources.get(sources.size() - 1).boundedness),
                    "All sources except the final source need to be bounded.");
        }
        ClosureCleaner.clean(
                sourceFactory, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
        sources.add(SourceListEntry.of(sourceFactory, boundedness));
        return (HybridSourceBuilder) this;
    }

    /** Build the source. */
    public HybridSource<T> build() {
        return new HybridSource(sources);
    }
}

...