Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The following graph describes the plan before transformation and plan after transformation for the above query.


Image RemovedImage Added


We do not need introduce new ExecNode for DynamicFilteringTableSourceScan, because the current mechanism has met our requirements, we just need to add input edges for Batch(Stream)ExecTableSourceScan node.

Build StreamGraph

Currently, the planner will recursively traverse from sink to source, and create transformations from source to sink. The planner only know the sink transformations. While DynamicFilteringDataCollector is not a real sink for planner, and we can find the transformation of DynamicFilteringDataCollector from sink transformations, because source transformation has no input. So we should register the transformations of DynamicFilteringDataCollector to the planner separately, and build the StreamGraph with all transformations.

Connector changes

The following graph lists the new introduced classes (marked as green color) for Hive connector to support DPP. FileSystem collector has similar class structure, I won't repeat it here.

Image Removed

  1. DynamicFileSplitEnumerator will handle the SourceEvent from SourceCoordinator and get the PartitionData from SourceEvent if it's a DynamicFilteringEvent, create the DynamicFileEnumerator to get the relevant partitions based on PartitionData. All split requests will be add to buffer to wait for the DynamicFilteringEvent to be handled and then start to handle the awaiting requests. The pseudocode looks like:

Scheduling deadlock

There is no data dependency between the fact-source operator and DynamicFilteringDataCollector operator, The order of scheduling between them is non-deterministic. A deadlock occurs if the fact-source operator is scheduled and waiting for DynamicFilteringEvent, but the DynamicFilteringDataCollector operator cannot be scheduled when resources are limited. To solve this problem, here are three options:

  1. Introduce data/control edges for source operator
    1. Pros:
      1. The optimization results are deterministic,
      2. users can visually see the data dependencies between them from the topology
    1. Cons: The change is too large
  1. Fact-source operator tries best to wait for DynamicFilteringEvent
    1. Pros: The solution is simple and clean.
    2. Cons:
      1. The optimization results are non-deterministic,
      2. Users can't visually see the data dependencies between them from the topology.
  1. Add a dependence operator which input is fact-source operator and DynamicFilteringDataCollector operator, this could ensure the DynamicFilteringDataCollector operator is scheduled before the fact-source operator . The dependency edge could be omitted if the fact source is chained with the join operator via the multiple-inputs tasks and there is already an edge between dim source and fact source. 
    1. Pros:
      1. The optimization results are deterministic
      2. the change is minimal
      3. users can see the data dependencies between them from the topology
    1. Cons: little hack, but do not effect execution

Currently, we prefer the last approach. In the long term, we prefer the first option.



Connector changes

The following graph lists the new introduced classes (marked as green color) for Hive connector to support DPP. FileSystem collector has similar class structure, I won't repeat it here.

Image Added

  1. DynamicFileSplitEnumerator will handle the SourceEvent from SourceCoordinator and get the PartitionData from SourceEvent if it's a DynamicFilteringEvent, create the DynamicFileEnumerator to get the relevant partitions based on PartitionData. All split requests will be add to buffer to wait for the DynamicFilteringEvent to be handled and then start to handle the awaiting requests. The pseudocode looks like:
public class DynamicFileSplitEnumerator<SplitT extends FileSourceSplit>
        implements SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> {

    private final DynamicFileEnumerator.Provider fileEnumeratorFactory;
    private final FileSplitAssigner.Provider splitAssignerFactory;
    private transient FileSplitAssigner splitAssigner;
    /** the buffer the requests before the partition data is received. */
    private final LinkedHashMap<Integer, String> readersAwaitingSplit;
    /** A flag which indicates whether the PartitionData is received. */
    private transient boolean partitionDataReceived;

    @Override
    public void handleSplitRequest(int subtask, @Nullable String hostname) {
public class DynamicFileSplitEnumerator<SplitT extends FileSourceSplit>
        implements SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> {

    private final DynamicFileEnumerator.Provider fileEnumeratorFactory;
    private final FileSplitAssigner.Provider splitAssignerFactory;
    private transient FileSplitAssigner splitAssigner;
    /** the buffer the requests before the partition data is received. */
    private final LinkedHashMap<Integer, String> readersAwaitingSplit;
    /** A flag which indicates whether the PartitionData is received. */
    private transient boolean partitionDataReceived;

    @Override
    public void handleSplitRequest(int subtask, @Nullable String hostname) {
        if (!partitionDataReceived) {
            readersAwaitingSplit.put(subtask, hostname);
            return;
        }
        // handle the request normally
    }

    @Override
    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
        if (sourceEvent instanceof DynamicFilteringEvent) {
            DynamicFileEnumerator fileEnumerator = fileEnumeratorFactory.create();
            fileEnumerator.setDynamicFilteringData(((DynamicFilteringEvent) sourceEvent).getData());
            Collection<FileSourceSplit> splits = fileEnumerator.enumerateSplits(new Path[1], context.currentParallelism());if (!partitionDataReceived) {
            splitAssigner = splitAssignerFactory.create(splitsreadersAwaitingSplit.put(subtask, hostname);
            this.partitionDataReceived = truereturn;
        }
    assignAwaitingRequests();
    // handle the request }normally
 else {
  }

    @Override
    public void LOG.error("Received unrecognized event: {}",handleSourceEvent(int subtaskId, SourceEvent sourceEvent); {
        }
if (sourceEvent instanceof DynamicFilteringEvent) }{

    private void assignAwaitingRequests() {
     DynamicFileEnumerator fileEnumerator = // ...
    }
fileEnumeratorFactory.create();
      
    @Override
  fileEnumerator.setDynamicFilteringData(((DynamicFilteringEvent)  public void addSplitsBack(List<SplitT> splits, int subtaskId) {
sourceEvent).getData());
            Collection<FileSourceSplit> splits = if (splitAssigner != null) {fileEnumerator.enumerateSplits(new Path[1], context.currentParallelism());
            List<FileSourceSplit> fileSplitssplitAssigner = new ArrayList<>splitAssignerFactory.create(splits);
            splitAssigner.addSplits(fileSplits)this.partitionDataReceived = true;
        }
     }

assignAwaitingRequests();
     @Override
   } publicelse PendingSplitsCheckpoint<SplitT>{
 snapshotState(long checkpointId) {
        if LOG.error(splitAssigner"Received != null) {unrecognized event: {}", sourceEvent);
        }
    return PendingSplitsCheckpoint.fromCollectionSnapshot(splitAssigner.remainingSplits());}

    private void assignAwaitingRequests() {
 }
    }
}
  1. DynamicFileEnumerator will do the partition filter of all partitions based on PartitionData, the pseudocode looks like:
public class HiveSourceDynamicFileEnumerator implements DynamicFileEnumerator {// ...
    private final List<HiveTablePartition> allPartitions;}
    private
 transient List<HiveTablePartition> finalPartitions;
 @Override
    public void setDynamicFilteringData(DynamicFilteringData dynamicFilteringDataaddSplitsBack(List<SplitT> splits, int subtaskId) {
        if finalPartitions(splitAssigner != new ArrayList<>();null) {
        for (HiveTablePartition partition : allPartitions)List<FileSourceSplit> {
fileSplits = new ArrayList<>(splits);
         Object[] values = dynamicPartitionKeyssplitAssigner.streamaddSplits(fileSplits);
        }
    }

    @Override
    public        .map(k -> partition.getPartitionSpec().get(k))PendingSplitsCheckpoint<SplitT> snapshotState(long checkpointId) {
        if (splitAssigner != null) {
                .toArray(String[]::newreturn PendingSplitsCheckpoint.fromCollectionSnapshot(splitAssigner.remainingSplits());
        }
    }
}


  1. DynamicFileEnumerator will do the partition filter of all partitions based on PartitionData, the pseudocode looks like:
public class HiveSourceDynamicFileEnumerator implements DynamicFileEnumerator {
if (dynamicFilteringData.contains(Row.of(values))) {
        private final List<HiveTablePartition> allPartitions;
    private transient List<HiveTablePartition> finalPartitions.add(partition);
 
    public void setDynamicFilteringData(DynamicFilteringData     }dynamicFilteringData) {
        }
finalPartitions =   }

new ArrayList<>();
    @Override
    publicfor Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits) throws IOException (HiveTablePartition partition : allPartitions) {
        return new ArrayList<>(createInputSplits(minDesiredSplits, finalPartitions ...)); Object[] values = dynamicPartitionKeys.stream()
    }
}

Scheduling deadlock

There is no data dependency between the fact-source operator and DynamicFilteringDataCollector operator, The order of scheduling between them is non-deterministic. A deadlock occurs if the fact-source operator is scheduled and waiting for DynamicFilteringEvent, but the DynamicFilteringDataCollector operator cannot be scheduled when resources are limited. To solve this problem, here are three options:

  1. Introduce data/control edges for source operator
    1. Pros:
      1. The optimization results are deterministic,
      2. users can visually see the data dependencies between them from the topology
    1. Cons: The change is too large
  1. Fact-source operator tries best to wait for DynamicFilteringEvent
    1. Pros: The solution is simple and clean.
    2. Cons:
      1. The optimization results are non-deterministic,
      2. Users can't visually see the data dependencies between them from the topology.
  1. Add a dependence operator which input is fact-source operator and DynamicFilteringDataCollector operator, this could ensure the DynamicFilteringDataCollector operator is scheduled before the fact-source operator . The dependency edge could be omitted if the fact source is chained with the join operator via the multiple-inputs tasks and there is already an edge between dim source and fact source. 
    1. Pros:
      1. The optimization results are deterministic
      2. the change is minimal
      3. users can see the data dependencies between them from the topology
    1. Cons: little hack, but do not effect execution

...

                        .map(k -> partition.getPartitionSpec().get(k))
                            .toArray(String[]::new);
            if (dynamicFilteringData.contains(Row.of(values))) {
                finalPartitions.add(partition);
            }
        }
    }

    @Override
    public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits) throws IOException {
        return new ArrayList<>(createInputSplits(minDesiredSplits, finalPartitions ...));
    }
}


Compatibility, Deprecation, and Migration Plan

...