THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
The following graph lists the new introduced classes (marked as green color) for Hive connector to support DPP. FileSystem collector has similar class structure, I won't repeat it here.
- DynamicFileSplitEnumerator will try it best to do partition pruning based on received DynamicFilteringEvent. If the split requests arrives before SourceEvent, the split assigner will also assign a split based on all partitions. Once the DynamicFilteringEvent arrives, a new split assigner will be created, the only the relevant split will be assigned for the following split requests. From the "Scheduling deadlock" section we can know, the dim-side operators will be scheduled first, this could make sure the DynamicFilteringEvent arrives DynamicFileSplitEnumerator before split requests from fact-source operator for most cases. If the placeholder operator can't be added and fact-source can't be chained with its output operator (such as: operator chain is explicitly disabled), this best-effort strategy can avoid scheduling deadlock. The pseudocode looks like:
@Internal
public class DynamicFileSplitEnumerator<SplitT extends FileSourceSplit>
implements SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>>,
SupportsHandleExecutionAttemptSourceEvent {
private final SplitEnumeratorContext<SplitT> context;
private final DynamicFileEnumerator.Provider fileEnumeratorFactory;
private final FileSplitAssigner.Provider splitAssignerFactory;
private transient FileSplitAssigner splitAssigner;
private final Set<SplitT> assignedSplits;
@Override
public void handleSplitRequest(int subtask, @Nullable String hostname) {
if (!context.registeredReaders().containsKey(subtask)) {
// reader failed between sending the request and now. skip this request.
return;
}
if (splitAssigner == null) {
createSplitAssigner(null);
}
final Optional<FileSourceSplit> nextSplit = getNextSplit(hostname);
if (nextSplit.isPresent()) {
final FileSourceSplit split = nextSplit.get();
context.assignSplit((SplitT) split, subtask);
assignedSplits.add((SplitT) split);
} else {
context.signalNoMoreSplits(subtask);
}
}
private Optional<FileSourceSplit> getNextSplit(String hostname) {
do {
final Optional<FileSourceSplit> nextSplit = splitAssigner.getNext(hostname);
if (nextSplit.isPresent()) {
// ignore the split if it has been assigned
if (!assignedSplits.contains(nextSplit.get())) {
return nextSplit;
}
} else {
return nextSplit;
}
} while (true);
}
@Override
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
if (sourceEvent instanceof DynamicFilteringEvent) {
createSplitAssigner(((DynamicFilteringEvent) sourceEvent).getData());
}
}
private void createSplitAssigner(@Nullable DynamicFilteringData dynamicFilteringData) {@Override
public void handleSourceEvent(int subtaskId, DynamicFileEnumeratorint fileEnumeratorattemptNumber, = fileEnumeratorFactory.create();SourceEvent sourceEvent) {
if (dynamicFilteringDatasourceEvent !=instanceof nullDynamicFilteringEvent) {
fileEnumerator.setDynamicFilteringData(dynamicFilteringDatacreateSplitAssigner(((DynamicFilteringEvent) sourceEvent).getData());
}
}
private void createSplitAssigner(@Nullable DynamicFilteringData dynamicFilteringData) {
DynamicFileEnumerator fileEnumerator //= create splitsfileEnumeratorFactory.create();
if (dynamicFilteringData != null) {
fileEnumerator.setDynamicFilteringData(dynamicFilteringData);
}
// create splits
splitAssigner = splitAssignerFactory.create(splits);
}
}
- DynamicFileEnumerator will do the partition filter of all partitions based on DynamicFilteringData, the pseudocode for HiveSourceDynamicFileEnumerator looks like:
...