/** Builder for HybridSource. */
public static class HybridSourceBuilder<T, EnumT extends SplitEnumerator>
implements Serializable {
private final List<SourceListEntry> sources;
public HybridSourceBuilder() {
sources = new ArrayList<>();
}
/** Add pre-configured source (without switch time modification). */
public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, ?, ?>>
HybridSourceBuilder<T, ToEnumT> addSource(NextSourceT source) {
return addSource(new PassthroughSourceFactory<>(source), source.getBoundedness());
}
/** Add source with deferred instantiation based on previous enumerator. */
public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, ?, ?>>
HybridSourceBuilder<T, ToEnumT> addSource(
SourceFactory<T, NextSourceT, EnumT> sourceFactory,
Boundedness boundedness) {
if (!sources.isEmpty()) {
Preconditions.checkArgument(
Boundedness.BOUNDED.equals(sources.get(sources.size() - 1).boundedness),
"All sources except the final source need to be bounded.");
}
ClosureCleaner.clean(
sourceFactory, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
sources.add(SourceListEntry.of(sourceFactory, boundedness));
return (HybridSourceBuilder) this;
}
/** Build the source. */
public HybridSource<T> build() {
return new HybridSource(sources);
}
} |