Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The following graph lists the new introduced classes (marked as green color) for Hive connector to support DPP. FileSystem collector has similar class structure, I won't repeat it here.

Image RemovedImage Added

  • DynamicFileSplitEnumerator will try it best to do partition pruning based on received DynamicFilteringEvent. If the split requests arrives before SourceEvent, the split assigner will also assign a split based on all partitions. Once the DynamicFilteringEvent arrives,  a new split assigner will be created, the only the  relevant split will be assigned for the following split requests. From the "Scheduling deadlock" section we can know, the dim-side operators will be scheduled first, this could make sure the DynamicFilteringEvent arrives DynamicFileSplitEnumerator before split requests from fact-source operator for most cases. If the placeholder operator can't be added and fact-source can't be chained with its output operator (such as: operator chain is explicitly disabled), this best-effort strategy can avoid scheduling deadlock. The pseudocode looks like:
@Internal
public class DynamicFileSplitEnumerator<SplitT extends FileSourceSplit>
implements SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>>,
SupportsHandleExecutionAttemptSourceEvent {

private final SplitEnumeratorContext<SplitT> context;

private final DynamicFileEnumerator.Provider fileEnumeratorFactory;

private final FileSplitAssigner.Provider splitAssignerFactory;
private transient FileSplitAssigner splitAssigner;
private final Set<SplitT> assignedSplits;

@Override
public void handleSplitRequest(int subtask, @Nullable String hostname) {
if (!context.registeredReaders().containsKey(subtask)) {
// reader failed between sending the request and now. skip this request.
return;
}
if (splitAssigner == null) {
createSplitAssigner(null);
}

final Optional<FileSourceSplit> nextSplit = getNextSplit(hostname);
if (nextSplit.isPresent()) {
final FileSourceSplit split = nextSplit.get();
context.assignSplit((SplitT) split, subtask);
assignedSplits.add((SplitT) split);
} else {
context.signalNoMoreSplits(subtask);
}
}

private Optional<FileSourceSplit> getNextSplit(String hostname) {
do {
final Optional<FileSourceSplit> nextSplit = splitAssigner.getNext(hostname);
if (nextSplit.isPresent()) {
// ignore the split if it has been assigned
if (!assignedSplits.contains(nextSplit.get())) {
return nextSplit;
}
} else {
return nextSplit;
}
} while (true);
}

@Override
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
if (sourceEvent instanceof DynamicFilteringEvent) {
createSplitAssigner(((DynamicFilteringEvent) sourceEvent).getData());
}
}

    private void createSplitAssigner(@Nullable DynamicFilteringData dynamicFilteringData) {@Override
public void handleSourceEvent(int subtaskId, DynamicFileEnumeratorint fileEnumeratorattemptNumber, = fileEnumeratorFactory.create();SourceEvent sourceEvent) {
if (dynamicFilteringDatasourceEvent !=instanceof nullDynamicFilteringEvent) {
fileEnumerator.setDynamicFilteringData(dynamicFilteringDatacreateSplitAssigner(((DynamicFilteringEvent) sourceEvent).getData());
}
}
private void createSplitAssigner(@Nullable DynamicFilteringData dynamicFilteringData) {
DynamicFileEnumerator fileEnumerator //= create splitsfileEnumeratorFactory.create();
if (dynamicFilteringData != null) {
fileEnumerator.setDynamicFilteringData(dynamicFilteringData);
}
// create splits
splitAssigner = splitAssignerFactory.create(splits);
}
}


  • DynamicFileEnumerator will do the partition filter of all partitions based on DynamicFilteringData, the pseudocode for HiveSourceDynamicFileEnumerator looks like:

...