Status
...
...
...
| Jira |
---|
server | ASF JIRA |
---|
serverId | 5aa69414-a9e9-3523-82ec-879b028fb15b |
---|
key | FLINK-22668 |
---|
|
|
---|
|
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block |
---|
language | java |
---|
title | SourceFactory |
---|
|
/**
* Factory for underlying sources of {@link HybridSource}.
*
* <p>This factory permits building of a source at graph construction time or deferred at switch
* time. Provides the ability to set a start position in any way a specific source allows.
* Future convenience could be built on top of it, for example a default implementation that
* recognizes optional interfaces to transfer position in a universal format.
*
* <p>Called when the current enumerator has finished. The previous source's final state can
* thus be used to construct the next source, as required for dynamic position transfer at time
* of switching.
*
* <p>If start position is known at job submission time, the source can be constructed in the
* entry point and simply wrapped into the factory, providing the benefit of validation during
* submission.
*/
@FunctionalInterface
public interface SourceFactory<
T, SourceT extends Source<T, ?, ?>, FromEnumT extends SplitEnumerator>
extends Serializable {
SourceT create(SourceSwitchContext<FromEnumT> context);
}
|
Code Block |
---|
language | java |
---|
title | SourceSwitchContext |
---|
|
/**
* Context provided to source factory.
*
* <p>To derive a start position at switch time, the source can be initialized from context of
* the previous enumerator. A specific enumerator implementation may carry state such as an end
* timestamp, that can be used to derive the start position of the next source.
*
* <p>Currently only the previous enumerator is exposed. The context interface allows for
* backward compatible extension, i.e. additional information about the previous source can be
* supplied in the future.
*/
public interface SourceSwitchContext<EnumT> {
EnumT getPreviousEnumerator();
}
|
Code Block |
---|
language | java |
---|
title | HybridSourceBuilder |
---|
|
/** Builder for HybridSource. */
public static class HybridSourceBuilder<T, EnumT extends SplitEnumerator>
implements Serializable {
private final List<SourceListEntry> sources;
public HybridSourceBuilder() {
sources = new ArrayList<>();
}
/** Add pre-configured source (without switch time modification). */
public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, ?, ?>>
HybridSourceBuilder<T, ToEnumT> addSource(NextSourceT source) {
return addSource(new PassthroughSourceFactory<>(source), source.getBoundedness());
}
/** Add source with deferred instantiation based on previous enumerator. */
public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, ?, ?>>
HybridSourceBuilder<T, ToEnumT> addSource(
SourceFactory<T, NextSourceT, EnumT> sourceFactory,
Boundedness boundedness) {
if (!sources.isEmpty()) {
Preconditions.checkArgument(
Boundedness.BOUNDED.equals(sources.get(sources.size() - 1).boundedness),
"All sources except the final source need to be bounded.");
}
ClosureCleaner.clean(
sourceFactory, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
sources.add(SourceListEntry.of(sourceFactory, boundedness));
return (HybridSourceBuilder) this;
}
/** Build the source. */
public HybridSource<T> build() {
return new HybridSource(sources);
}
} |
...