Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

select * from store_returns, date_dim
where sr_returned_date_sk = d_date_sk
and d_year = 2000

...

In above example, the result from date_dim which d_year is 2000 will be send to join build side and the Coordinator, and the SplitEnumerator of store_returns will filter the partitions based on the filtered date_dim data, and send the real required splits to the source scan of store_returns.

Dynamic partition pruning mechanism can improve performance by avoiding reading large amounts of irrelevant data, and it works for both batch and streaming queries. Here we refer to dynamic partition pruning as DPP.

Public Interfaces

The whole workflow for DPP

The following graph shows the optimization and execution process of a query using DPP mechanism for FileSystem connector and Hive connector.

  1. The parser will parse a given query, convert to an AST (Abstract Syntax Tree) plan. The optimizer will detect and transform the plan with DPP pattern via planner rules, and get the optimized physical plan. The planner will generate the stream graph based on the physical plan.
  2. Submit the generated job graph to the job manager.
  3. The JM will schedule the job vertices for dim-source and DynamicPartitionCollector first. The filtered data from dim-source operator will be send both to DynamicPartitionCollector operator and the shuffle of Join operator.
  4. DynamicPartitionCollector collects the input data, removes the irrelevant column data and duplicated records, and then sends the partition data (wrapped in DynamicPartitionEvent) to its OperatorCoordinator once it collects all input data in finish method. The coordinator will deliver the event to the SourceCoordinators of the relating Fact sources, then the SourceCoordinators will deliver the DynamicPartitionEvent to DynamicFileSplitEnumerator.
  5. The DynamicFileSplitEnumerator finds the the relevant partitions from the all partitions via the partition data from dim-source, and creates the target splits for fact-source.
  6. The JM schedules the job vertices for fact-source.
  7. The fact-source gets the splits from DynamicFileSplitEnumerator, reads the data, send the shuffle of Join operator.
  8. The join operator reads the input data and does join operation.

Image Removed

New public interfaces

SupportsDynamicFiltering is introduce to let the planner know whether a source connector supports dynamic filtering and let the source connector to filter the irrelevant partitions or input data in runtime.

/**
* Push dynamic filter into {@link ScanTableSource}, the table source can filter the partitions or
* the input data in runtime to reduce scan I/O.
*/

...

Since the split enumerator is executed in JM, the filtered partition data from dim-source operator should be send to the split enumerator via existing coordinator mechanism. So we introduce DynamicFilteringEvent to wrap the filtering data.

...

Notes:  store_returns is a partitioned fact table, sr_returned_date_sk is the partition key, and date_dim is a dim table.


Currently, the Flink source operator does not have any input edge, and the splits are generated in SplitEnumerator for new source (FLIP-27). Based on the current mechanism, the dynamic partition pruning will happen in SplitEnumerator (such as: filesystem connector or hive connector). I in the future the source operator supports input, the input data could support dynamic filtering (such as: filter row-group for parquet based on statistics, or even filter records based on bloom filter).

In this FLIP we will introduce a mechanism for detecting dynamic partition pruning patterns in optimization phase and performing partition pruning at runtime by sending the dimension table results to the SplitEnumerator of fact table via existing coordinator mechanism.

In above example, the result from date_dim which d_year is 2000 will be send to join build side and the Coordinator, and the SplitEnumerator of store_returns will filter the partitions based on the filtered date_dim data, and send the real required splits to the source scan of store_returns.

Dynamic partition pruning mechanism can improve performance by avoiding reading large amounts of irrelevant data, and it works for both batch and streaming queries with bounded source. Here we refer to dynamic partition pruning as DPP.

Public Interfaces

The whole workflow for DPP

The following graph shows the optimization and execution process of a query using DPP mechanism for table source connectors which supports FLIP-27 (Such as: filesystem or hive).

  1. The parser parses a given query, convert to an AST (Abstract Syntax Tree) plan. The optimizer will detect with DPP pattern and transform the plan to via planner rules, and then the optimized physical plan will be converted to ExecNode plan. The ExecNode plan will be converted to StreamingGraph and then JobGraph.
  2. The client submits the generated JobGraph to the job manager.
  3. The JM schedules the job vertices for dim-source and DynamicFilteringDataCollector first. The filtered data from dim-source operator will be send both to DynamicFilteringDataCollector operator and the shuffle input of Join operator.
  4. DynamicFilteringDataCollector collects the input data, removes the irrelevant column data and duplicated records, and then sends the records (wrapped in DynamicFilteringEvent) to its OperatorCoordinator once it collects all input data in finish method. The coordinator delivers the event to the SourceCoordinators of the relating fact-source operator, then the SourceCoordinator delivers the DynamicFilteringEvent to DynamicFileSplitEnumerator.
  5. The DynamicFileSplitEnumerator finds the the relevant partitions from the all partitions via the data from dim-source, and creates the target splits for fact-source.
  6. The JM schedules the job vertices for fact-source and Placeholder operator.
  7. The fact-source gets the splits from DynamicFileSplitEnumerator, reads the data, send the shuffle input of Join operator.
  8. The join operator reads the input data and does join operation.

Image Added

New public interfaces

For filesystem connector and hive connector, the partition info only contains the specific partition value, while for lake storage, the partition info not only contains the specific partition value but also some statistics which could help doing more filtering. In the future we can even do dynamic filtering based on records. So we extend the concept from dynamic partition pruning to dynamic filtering for future extension.

SupportsDynamicFiltering is introduced to let the planner know whether a source connector supports dynamic filtering and let the source connector to filter the irrelevant partitions or input data in runtime. 


/**
* Push dynamic filter into {@link ScanTableSource}, the table source can filter the partitions or
* the input data in runtime to reduce scan I/O.
*/
@PublicEvolving
public interface SupportsDynamicFiltering {

/**
* apply the candidate filter fields into the table source, and return the accepted fields. The
* data corresponding the filter fields will be provided in runtime, which can be used to filter
* the partitions and the input data.
*/
List<String> applyDynamicFiltering(List<String> candidateFilterFields);
}


Since the split enumerator is executed in JM, the filtered partition data from dim-source operator should be send to the split enumerator via existing coordinator mechanism. So we introduce DynamicFilteringEvent to wrap the filtering data.

@PublicEvolving
public class DynamicFilteringEvent implements SourceEvent {
private final DynamicFilteringData

DynamicFilteringData wraps the values without the schema, the value order is consistent with the argument value of SupportsDynamicFiltering#applyDynamicFiltering method.

@PublicEvolving
public class DynamicFilteringData implements Serializable {
private final TypeInformation<RowData> typeInfo;
private final RowType rowType;
private final List<byte[]> serializedData;
private final boolean exceedThreshold;
private transient List<RowData> data;

public RowType getRowType(DynamicFilteringEvent(DynamicFilteringData data) {
return rowTypethis.data = data;
}

public Optional<List<RowData>>DynamicFilteringData getData() {
if (exceedThreshold) {return data;
}

@Override
return Optional.emptypublic String toString(); {
return "DynamicFilteringEvent{" + "data=" + data + '}';
}
}


DynamicFilteringData wraps the values without the schema, the value order is consistent with the argument value of SupportsDynamicFiltering#applyDynamicFiltering method.

@PublicEvolving
public class DynamicFilteringData implements Serializable {
private final TypeInformation<RowData> typeInfo}
if (data == null) {
data = new ArrayList<>();
private final RowType rowType;
TypeSerializer<RowData> serializer = typeInfo.createSerializer(new ExecutionConfig())private final List<byte[]> serializedData;
private final boolean exceedThreshold;
private transient List<RowData> data;

public forRowType (byte[] bytes : serializedDatagetRowType() {
return rowType;
}

public tryOptional<List<RowData>> getData(ByteArrayInputStream) {
bais = new ByteArrayInputStream(bytes); if (exceedThreshold) {
return Optional.empty();
}

DataInputViewStreamWrapper inView = newif DataInputViewStreamWrapper(bais))(data == null) {
data = new ArrayList<>();
RowDataTypeSerializer<RowData> partitionserializer = serializer.deserialize(inViewtypeInfo.createSerializer(new ExecutionConfig());
for (byte[] bytes : serializedData) {
data.add(partition);
try (ByteArrayInputStream bais }= catchnew ByteArrayInputStream(Exception ebytes);
{
throw new TableException("Unable toDataInputViewStreamWrapper deserializeinView the value.", e);= new DataInputViewStreamWrapper(bais)) {
}
RowData partition = serializer.deserialize(inView);
}
}
return Optionaldata.ofadd(datapartition);
}
public} booleancatch contains(RowDataException rowe) {
// ...
}
}

Currently, for FileSystem connector and Hive connector, the splits are created by FileEnumerator. However, existing FileEnumerators do not match the requirement, therefore we introduce a new FileEnumerator named DynamicFileEnumerator to support creating splits based on partition.

...

            throw new TableException("Unable to deserialize the value.", e);
}
}
}
return Optional.of(data);
}

public boolean contains(RowData row) {
// ...
}
}


Currently, for FileSystem connector and Hive connector, the splits are created by FileEnumerator. However, existing FileEnumerators do not match the requirement, therefore we introduce a new FileEnumerator named DynamicFileEnumerator to support creating splits based on partition.

@PublicEvolving
public interface DynamicFileEnumerator extends FileEnumerator {

void setDynamicFilteringData(DynamicFilteringData data);

/** Provider for DynamicFileEnumerator. */
@FunctionalInterface
interface Provider extends FileEnumerator.Provider {

DynamicFileEnumerator create();
}
}


We introduce a config option to enable/disable this feature, default is true.

@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
public static final ConfigOption<Boolean> TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED =
key("table.optimizer.dynamic-filtering.enabled")
.booleanType()
.defaultValue(true)
.withDescription(
"When it is true, the optimizer will try to push dynamic filtering into scan table source,"
+ " the irrelevant partitions or input data will be filtered to reduce scan I/O in runtime.");


We can find whether a query successfully applied the DPP optimization via EXPLAIN result. The following  snippets describes the explain result for the above query. (The optimized plan will contains DynamicFilteringDataCollector.)

== Abstract Syntax Tree ==
LogicalProject(sr_returned_date_sk=[$0], sr_return_time_sk=[$1], sr_item_sk=[$2], d_date_sk=[$3],d_date_id=[$4], d_year=[$5])
+- LogicalFilter(condition=[AND(=($0, $3), =($5, 2000))])
    +- LogicalJoin(condition=[true], joinType=[inner])
         :- LogicalTableScan(table=[[test-catalog, default, store_returns]])
         +- LogicalTableScan(table=[[test-catalog, default, date_dim]])

== Optimized Physical Plan ==
HashJoin(joinType=[InnerJoin], where=[=(sr_returned_date_sk, d_date_sk)], select=[sr_returned_date_sk, sr_return_time_sk, sr_item_sk, d_date_sk, d_date_id, d_year], build=[right])
:- Exchange(distribution=[hash[sr_returned_date_sk]])
:   +- DynamicFilteringTableSourceScan(table=[[test-catalog, default, store_returns]], fields=[sr_returned_date_sk, sr_return_time_sk, sr_item_sk])
:       +- DynamicFilteringDataCollector(fields=[d_date_sk])
:            +- Calc(select=[d_date_sk, d_date_id, d_year], where=[=(d_year, 1)])
:                 +- TableSourceScan(table=[[test-catalog, default, date_dim]], fields=[d_date_sk, d_date_id, d_year])
+- Exchange(distribution=[hash[x]])
    +- Calc(select=[d_date_sk, d_date_id, d_year], where=[=(d_year, 2000)])
         +- TableSourceScan(table=[[test-catalog, default, date_dim]], fields=[d_date_sk, d_date_id, d_year])

== Optimized Execution Plan ==
HashJoin(joinType=[InnerJoin], where=[=(sr_returned_date_sk, d_date_sk)], select=[sr_returned_date_sk, sr_return_time_sk, sr_item_sk, d_date_sk, d_date_id, d_year], build=[right])
:- Exchange(distribution=[hash[sr_returned_date_sk]])
:  +- TableSourceScan(table=[[test-catalog, default, store_returns]], fields=[sr_returned_date_sk, sr_return_time_sk, sr_item_sk])
:       +- DynamicFilteringDataCollector(fields=[d_date_sk])
:            +- Calc(select=[d_date_sk, d_date_id, d_year], where=[=(d_year, 2000)])(reuse_id=[1])
:                 +- TableSourceScan(table=[[test-catalog, default, date_dim]], fields=[d_date_sk, d_date_id, d_year])
+- Exchange(distribution=[hash[d_date_sk]])
     +- Reused(reference_id=[1])

We introduce a config option to enable/disable this feature, default is true.

@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
public static final ConfigOption<Boolean> TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED =
key("table.optimizer.dynamic-filtering.enabled")
.booleanType()
.defaultValue(true)
.withDescription(
"When it is true, the optimizer will try to push dynamic filtering into scan table source,"
+ " the irrelevant partitions or input data will be filtered to reduce scan I/O in runtime.");

transformation

Proposed Changes

Optimizer changes

...

We prefer the first approach. Now we can introduce a few planner rules in physical rewrite phase to detect and transform the plan, the DPP pattern can be defined as: An inner join with equal condition which field is partition column, one input is dim source with filter, and another input is fact source of partition table. After transformation, a new physical node named DynamicPartitionCollector will be introduced, its input is filter node and its output is fact table source node named DynamicPartitionTableScan.

Currently, we do not introduce threshold for the size of partition data, because it depends on the statistics.

If the statistics is not available, this feature is also not available. We can improve it once we introduce file size statistics which is more accessible and more reliable.

The following graph describes the plan before transformation and plan after transformation for the query.

...

DynamicFilteringDataCollector will be introduced, its input is filter node and its output is fact table source node named DynamicFilteringTableSourceScan.

The following graph describes the plan before transformation and plan after transformation for the above query.



We do not need introduce new ExecNode for DynamicParttionTableScanDynamicFilteringTableSourceScan, because the current mechanism has met our requirements, we just need to add input edges for Batch(Stream)ExecTableSourceScan node.

...

Currently, the planner will recursively traverse from sink to source, and create transformations from source to sink. The planner only know the sink transformations. While DynamicPartitionCollector DynamicFilteringDataCollector is not a real sink for planner, and we can find the transformation of DynamicPartitionCollector DynamicFilteringDataCollector from sink transformations, because source transformation has no input. So we should register the transformations of DynamicPartitionCollector to DynamicFilteringDataCollector to the planner separately, and build the StreamGraph with all transformations.

...

  1. DynamicFileSplitEnumerator will handle the SourceEvent from SourceCoordinator and get the PartitionData from SourceEvent if it's a DynamicPartitionEventDynamicFilteringEvent, create the DynamicFileEnumerator to get the relevant partitions based on PartitionData. All split requests will be add to buffer to wait for the DynamicPartitionEvent DynamicFilteringEvent to be handled and then start to handle the awaiting requests. The pseudocode looks like:
public class DynamicFileSplitEnumerator<SplitT extends FileSourceSplit>
        implements SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> {

    private final DynamicFileEnumerator.Provider fileEnumeratorFactory;
    private final FileSplitAssigner.Provider splitAssignerFactory;
    private transient FileSplitAssigner splitAssigner;
    /** the buffer the requests before the partition data is received. */
    private final LinkedHashMap<Integer, String> readersAwaitingSplit;
    /** A flag which indicates whether the PartitionData is received. */
    private transient boolean partitionDataReceived;

    @Override
    public void handleSplitRequest(int subtask, @Nullable String hostname) {
        if (!partitionDataReceived) {
            readersAwaitingSplit.put(subtask, hostname);
            return;
        }
        // handle the request normally
    }

    @Override
    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
        if (sourceEvent instanceof DynamicPartitionEventDynamicFilteringEvent) {
            DynamicFileEnumerator fileEnumerator = fileEnumeratorFactory.create();
            fileEnumerator.setPartitionDatasetDynamicFilteringData(((DynamicPartitionEventDynamicFilteringEvent) sourceEvent).getData());
            Collection<FileSourceSplit> splits = fileEnumerator.enumerateSplits(new Path[1], context.currentParallelism());
            splitAssigner = splitAssignerFactory.create(splits);
            this.partitionDataReceived = true;
            assignAwaitingRequests();
        } else {
            LOG.error("Received unrecognized event: {}", sourceEvent);
        }
    }

    private void assignAwaitingRequests() {
        // ...
    }
    
    @Override
    public void addSplitsBack(List<SplitT> splits, int subtaskId) {
        if (splitAssigner != null) {
            List<FileSourceSplit> fileSplits = new ArrayList<>(splits);
            splitAssigner.addSplits(fileSplits);
        }
    }

    @Override
    public PendingSplitsCheckpoint<SplitT> snapshotState(long checkpointId) {
        if (splitAssigner != null) {
            return PendingSplitsCheckpoint.fromCollectionSnapshot(splitAssigner.remainingSplits());
        }
    }
}

...

public class HiveSourceDynamicFileEnumerator implements DynamicFileEnumerator {
    private final List<HiveTablePartition> allPartitions;
    private transient List<HiveTablePartition> finalPartitions;
 
    public void setPartitionDatasetDynamicFilteringData(PartitionDataDynamicFilteringData partitionDatadynamicFilteringData) {
        finalPartitions = new ArrayList<>();
        for (HiveTablePartition partition : allPartitions) {
            Object[] values = dynamicPartitionKeys.stream()
                            .map(k -> partition.getPartitionSpec().get(k))
                            .toArray(String[]::new);
            if (partitionDatadynamicFilteringData.contains(Row.of(values))) {
                finalPartitions.add(partition);
            }
        }
    }

    @Override
    public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits) throws IOException {
        return new ArrayList<>(createInputSplits(minDesiredSplits, finalPartitions ...));
    }
}

...

There is no data dependency between the fact-source operator and DynamicPartitionCollector DynamicFilteringDataCollector operator, The order of scheduling between them is non-deterministic. A deadlock occurs if the fact-source operator is scheduled and waiting for DynamicPartitionEventDynamicFilteringEvent, but the DynamicPartitionCollector DynamicFilteringDataCollector operator cannot be scheduled when resources are limited. To solve this problem, here are three options:

...

  1. Fact-source operator tries best to wait for DynamicPartitionEventDynamicFilteringEvent
    1. Pros: The solution is simple and clean.
    2. Cons:

...

  1. Add a dependence operator which input is fact-source operator and DynamicPartitionCollector DynamicFilteringDataCollector operator, this could ensure the DynamicPartitionCollector DynamicFilteringDataCollector operator is scheduled before the fact-source operator . The dependency edge could be omitted if the fact source is chained with the join operator via the multiple-inputs tasks and there is already an edge between dim source and fact source. 

...