Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: "Under Discussion"

Discussion thread

JIRA: here (<- link to )

...

Discussion threadhttps://lists.apache.org/thread/v0b8pfh0o7rwtlok2mfs5s6q9w5vw8h6
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-28706

Release1.16


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

@PublicEvolving
public interface SupportsDynamicFiltering {

  /**
* applyReturns the candidate filter fields intothis thepartition table source, andsupported. return the accepted fields. The
* data corresponding the filter fields will be provided in runtime, which can be used to filter
* the partitions and the input data.
*/
List<String>This method can tell the
* planner which fields can be used as dynamic filtering fields, the planner will pick some
* fields from the returned fields based on the query, and create dynamic filtering operator.
*/
List<String> listAcceptedFilterFields();

/**
* Applies the candidate filter fields into the table source. The data corresponding the filter
* fields will be provided in runtime, which can be used to filter the partitions or the input
* data.
*
* <p>NOTE: the candidate filter fields are always from the result of {@link
* #listAcceptedFilterFields()}.
*/
void applyDynamicFiltering(List<String> candidateFilterFields);
}

...

== Optimized Physical Plan ==
HashJoin(joinType=[InnerJoin], where=[=(sr_returned_date_sk, d_date_sk)], select=[sr_returned_date_sk, sr_return_time_sk, sr_item_sk, d_date_sk, d_date_id, d_year], build=[right])
:- Exchange(distribution=[hash[sr_returned_date_sk]])
:   +- DynamicFilteringTableSourceScan(table=[[test-catalog, default, store_returns]], fields=[sr_returned_date_sk, sr_return_time_sk, sr_item_sk])
:       +- DynamicFilteringDataCollector(fields=[d_date_sk])
:            +- Calc(select=[d_date_sk, d_date_id, d_year], where=[=(d_year, 2000)])
:                 +- TableSourceScan(table=[[test-catalog, default, date_dim]], fields=[d_date_sk, d_date_id, d_year])
+- Exchange(distribution=[hash[xd_date_sk]])
    +- Calc(select=[d_date_sk, d_date_id, d_year], where=[=(d_year, 2000)])
         +- TableSourceScan(table=[[test-catalog, default, date_dim]], fields=[d_date_sk, d_date_id, d_year])

...

The following graph lists the new introduced classes (marked as green color) for Hive connector to support DPP. FileSystem collector has similar class structure, I won't repeat it here.

Image RemovedImage Added

  • DynamicFileSplitEnumerator will try it best to do partition pruning based on received DynamicFilteringEvent. If the split requests arrives before SourceEvent, the split assigner will also assign a split based on all partitions. Once the DynamicFilteringEvent arrives,  a new split assigner will be created, the only the  relevant split will be assigned for the following split requests. From the "Scheduling deadlock" section we can know, the dim-side operators will be scheduled first, this could make sure the DynamicFilteringEvent arrives DynamicFileSplitEnumerator before split requests from fact-source operator for most cases. If the placeholder operator can't be added and fact-source can't be chained with its output operator (such as: operator chain is explicitly disabled), this best-effort strategy can avoid scheduling deadlock. The pseudocode looks like:
@Internal
public class DynamicFileSplitEnumerator<SplitT extends FileSourceSplit>
implements SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>>,
SupportsHandleExecutionAttemptSourceEvent {

private final SplitEnumeratorContext<SplitT> context;

private final DynamicFileEnumerator.Provider fileEnumeratorFactory;

private final FileSplitAssigner.Provider splitAssignerFactory;
private transient FileSplitAssigner splitAssigner;
private final Set<SplitT> assignedSplits;

@Override
public void handleSplitRequest(int subtask, @Nullable String hostname) {
if (!context.registeredReaders().containsKey(subtask)) {
// reader failed between sending the request and now. skip this request.
return;
}
if (splitAssigner == null) {
createSplitAssigner(null);
}

final Optional<FileSourceSplit> nextSplit = getNextSplit(hostname);
if (nextSplit.isPresent()) {
final FileSourceSplit split = nextSplit.get();
context.assignSplit((SplitT) split, subtask);
assignedSplits.add((SplitT) split);
} else {
context.signalNoMoreSplits(subtask);
}
}

private Optional<FileSourceSplit> getNextSplit(String hostname) {
do {
final Optional<FileSourceSplit> nextSplit = splitAssigner.getNext(hostname);
if (nextSplit.isPresent()) {
// ignore the split if it has been assigned
if (!assignedSplits.contains(nextSplit.get())) {
return nextSplit;
}
} else {
return nextSplit;
}
} while (true);
}

@Override
public void handleSourceEvent(int subtaskId, }SourceEvent elsesourceEvent) {
if (sourceEvent instanceof DynamicFilteringEvent) {
return nextSplit;
}createSplitAssigner(((DynamicFilteringEvent) sourceEvent).getData());
} while (true);
}

    @Override
public void handleSourceEvent(int subtaskId, int attemptNumber, SourceEvent sourceEvent) {
if (sourceEvent instanceof DynamicFilteringEvent) {
LOG.info("Received DynamicFilteringEvent: {}", subtaskId);
createSplitAssigner(((DynamicFilteringEvent) sourceEvent).getData());
}
}

private void createSplitAssigner(@Nullable DynamicFilteringData dynamicFilteringData) {
DynamicFileEnumerator fileEnumerator = fileEnumeratorFactory.create();
if (dynamicFilteringData != null) {
fileEnumerator.setDynamicFilteringData(dynamicFilteringData);
}
// create splits
splitAssigner = splitAssignerFactory.create(splits);
}
}


  • DynamicFileEnumerator will do the partition filter of all partitions based on DynamicFilteringData, the pseudocode for HiveSourceDynamicFileEnumerator looks like:

...