Status
Current state: "Under Discussion"
Discussion thread:
JIRA: here (<- link to )
...
Discussion thread | https://lists.apache.org/thread/v0b8pfh0o7rwtlok2mfs5s6q9w5vw8h6 | ||||||||
---|---|---|---|---|---|---|---|---|---|
Vote thread | |||||||||
JIRA |
| ||||||||
Release | 1.16 |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
@PublicEvolving
public interface SupportsDynamicFiltering {
/**
* applyReturns the candidate filter fields intothis thepartition table source, andsupported. return the accepted fields. The
* data corresponding the filter fields will be provided in runtime, which can be used to filter
* the partitions and the input data.
*/
List<String>This method can tell the
* planner which fields can be used as dynamic filtering fields, the planner will pick some
* fields from the returned fields based on the query, and create dynamic filtering operator.
*/
List<String> listAcceptedFilterFields();
/**
* Applies the candidate filter fields into the table source. The data corresponding the filter
* fields will be provided in runtime, which can be used to filter the partitions or the input
* data.
*
* <p>NOTE: the candidate filter fields are always from the result of {@link
* #listAcceptedFilterFields()}.
*/
void applyDynamicFiltering(List<String> candidateFilterFields);
}
...
== Optimized Physical Plan ==
HashJoin(joinType=[InnerJoin], where=[=(sr_returned_date_sk, d_date_sk)], select=[sr_returned_date_sk, sr_return_time_sk, sr_item_sk, d_date_sk, d_date_id, d_year], build=[right])
:- Exchange(distribution=[hash[sr_returned_date_sk]])
: +- DynamicFilteringTableSourceScan(table=[[test-catalog, default, store_returns]], fields=[sr_returned_date_sk, sr_return_time_sk, sr_item_sk])
: +- DynamicFilteringDataCollector(fields=[d_date_sk])
: +- Calc(select=[d_date_sk, d_date_id, d_year], where=[=(d_year, 2000)])
: +- TableSourceScan(table=[[test-catalog, default, date_dim]], fields=[d_date_sk, d_date_id, d_year])
+- Exchange(distribution=[hash[xd_date_sk]])
+- Calc(select=[d_date_sk, d_date_id, d_year], where=[=(d_year, 2000)])
+- TableSourceScan(table=[[test-catalog, default, date_dim]], fields=[d_date_sk, d_date_id, d_year])
...
The following graph lists the new introduced classes (marked as green color) for Hive connector to support DPP. FileSystem collector has similar class structure, I won't repeat it here.
- DynamicFileSplitEnumerator will try it best to do partition pruning based on received DynamicFilteringEvent. If the split requests arrives before SourceEvent, the split assigner will also assign a split based on all partitions. Once the DynamicFilteringEvent arrives, a new split assigner will be created, the only the relevant split will be assigned for the following split requests. From the "Scheduling deadlock" section we can know, the dim-side operators will be scheduled first, this could make sure the DynamicFilteringEvent arrives DynamicFileSplitEnumerator before split requests from fact-source operator for most cases. If the placeholder operator can't be added and fact-source can't be chained with its output operator (such as: operator chain is explicitly disabled), this best-effort strategy can avoid scheduling deadlock. The pseudocode looks like:
@Internal
public class DynamicFileSplitEnumerator<SplitT extends FileSourceSplit>
implements SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>>,
SupportsHandleExecutionAttemptSourceEvent {
private final SplitEnumeratorContext<SplitT> context;
private final DynamicFileEnumerator.Provider fileEnumeratorFactory;
private final FileSplitAssigner.Provider splitAssignerFactory;
private transient FileSplitAssigner splitAssigner;
private final Set<SplitT> assignedSplits;
@Override
public void handleSplitRequest(int subtask, @Nullable String hostname) {
if (!context.registeredReaders().containsKey(subtask)) {
// reader failed between sending the request and now. skip this request.
return;
}
if (splitAssigner == null) {
createSplitAssigner(null);
}
final Optional<FileSourceSplit> nextSplit = getNextSplit(hostname);
if (nextSplit.isPresent()) {
final FileSourceSplit split = nextSplit.get();
context.assignSplit((SplitT) split, subtask);
assignedSplits.add((SplitT) split);
} else {
context.signalNoMoreSplits(subtask);
}
}
private Optional<FileSourceSplit> getNextSplit(String hostname) {
do {
final Optional<FileSourceSplit> nextSplit = splitAssigner.getNext(hostname);
if (nextSplit.isPresent()) {
// ignore the split if it has been assigned
if (!assignedSplits.contains(nextSplit.get())) {
return nextSplit;
}
} else {
return nextSplit;
}
} while (true);
}
@Override
public void handleSourceEvent(int subtaskId, }SourceEvent elsesourceEvent) {
if (sourceEvent instanceof DynamicFilteringEvent) {
return nextSplit;
}createSplitAssigner(((DynamicFilteringEvent) sourceEvent).getData());
} while (true);
}
@Override
public void handleSourceEvent(int subtaskId, int attemptNumber, SourceEvent sourceEvent) {
if (sourceEvent instanceof DynamicFilteringEvent) {
LOG.info("Received DynamicFilteringEvent: {}", subtaskId);
createSplitAssigner(((DynamicFilteringEvent) sourceEvent).getData());
}
}
private void createSplitAssigner(@Nullable DynamicFilteringData dynamicFilteringData) {
DynamicFileEnumerator fileEnumerator = fileEnumeratorFactory.create();
if (dynamicFilteringData != null) {
fileEnumerator.setDynamicFilteringData(dynamicFilteringData);
}
// create splits
splitAssigner = splitAssignerFactory.create(splits);
}
}
- DynamicFileEnumerator will do the partition filter of all partitions based on DynamicFilteringData, the pseudocode for HiveSourceDynamicFileEnumerator looks like:
...