Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleSourceSwitchContext
/**
 * Context provided to source factory.
 *
 * <p>To derive a start position at switch time, the source can be initialized from context of
 * the previous enumerator. A specific enumerator implementation may carry state such as an end
 * timestamp, that can be used to derive the start position of the next source.
 *
 * <p>Currently only the previous enumerator is exposed. The context interface allows for
 * backward compatible extension, i.e. additional information about the previous source can be
 * supplied in the future.
 */
public interface SourceSwitchContext<EnumT> {
    EnumT getPreviousEnumerator();
}


Code Block
languagejava
titleHybridSourceBuilder
/** Builder for HybridSource. */
public static class HybridSourceBuilder<T, EnumT extends SplitEnumerator>
        implements Serializable {
    private final List<SourceListEntry> sources;

    public HybridSourceBuilder() {
        sources = new ArrayList<>();
    }

    /** Add pre-configured source (without switch time modification). */
    public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, ?, ?>>
            HybridSourceBuilder<T, ToEnumT> addSource(NextSourceT source) {
        return addSource(new PassthroughSourceFactory<>(source), source.getBoundedness());
    }

    /** Add source with deferred instantiation based on previous enumerator. */
    public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, ?, ?>>
            HybridSourceBuilder<T, ToEnumT> addSource(
                    SourceFactory<T, NextSourceT, EnumT> sourceFactory,
                    Boundedness boundedness) {
        if (!sources.isEmpty()) {
            Preconditions.checkArgument(
                    Boundedness.BOUNDED.equals(sources.get(sources.size() - 1).boundedness),
                    "All sources except the final source need to be bounded.");
        }
        ClosureCleaner.clean(
                sourceFactory, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
        sources.add(SourceListEntry.of(sourceFactory, boundedness));
        return (HybridSourceBuilder) this;
    }

    /** Build the source. */
    public HybridSource<T> build() {
        return new HybridSource(sources);
    }
}

...