Status

Discussion threadhttps://lists.apache.org/thread/v0b8pfh0o7rwtlok2mfs5s6q9w5vw8h6
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release1.16


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In the field of data analysis, star-schema[1] is the simplest style of and most widely used data mart patterns. It's natural in a star schema to map one or more dimensions to partition columns. Detecting and avoiding unnecessary data scans is the most important responsibility of the optimizer. Currently, Flink supports static partition pruning: the conditions in the WHERE clause are analyzed to determine in advance which partitions can be safely skipped in the optimization phase. Another common scenario: the partitions information is not available in the optimization phase but in the execution phase. That's the problem this FLIP is trying to solve: dynamic partition pruning, which could reduce the partition table source IO.

Consider a star-schema which consists of one or multiple fact tables referencing any number of dimension tables with partition key columns from fact tables. In such join queries, the partitions in fact table can be pruned through the result from the filtering the dimension tables. The following is the simplest example which shows the dynamic partition pruning could work.

select * from store_returns, date_dim
where sr_returned_date_sk = d_date_sk
and d_year = 2000

Notes:  store_returns is a partitioned fact table, sr_returned_date_sk is the partition key, and date_dim is a dim table.


Currently, the Flink source operator does not have any input edge, and the splits are generated in SplitEnumerator for new source (FLIP-27). Based on the current mechanism, the dynamic partition pruning will happen in SplitEnumerator (such as: filesystem connector or hive connector). I in the future the source operator supports input, the input data could support dynamic filtering (such as: filter row-group for parquet based on statistics, or even filter records based on bloom filter).

In this FLIP we will introduce a mechanism for detecting dynamic partition pruning patterns in optimization phase and performing partition pruning at runtime by sending the dimension table results to the SplitEnumerator of fact table via existing coordinator mechanism.

In above example, the result from date_dim which d_year is 2000 will be send to join build side and the Coordinator, and the SplitEnumerator of store_returns will filter the partitions based on the filtered date_dim data, and send the real required splits to the source scan of store_returns.

Dynamic partition pruning mechanism can improve performance by avoiding reading large amounts of irrelevant data, and it works for both batch and streaming queries with bounded source. Here we refer to dynamic partition pruning as DPP.

Public Interfaces

The whole workflow for DPP

The following graph shows the optimization and execution process of a query using DPP mechanism for table source connectors which supports FLIP-27 (Such as: filesystem or hive).

  1. The parser parses a given query, convert to an AST (Abstract Syntax Tree) plan. The optimizer will detect with DPP pattern and transform the plan to via planner rules, and then the optimized physical plan will be converted to ExecNode plan. The ExecNode plan will be converted to StreamingGraph and then JobGraph.
  2. The client submits the generated JobGraph to the job manager.
  3. The JM schedules the job vertices for dim-source and DynamicFilteringDataCollector first. The filtered data from dim-source operator will be send both to DynamicFilteringDataCollector operator and the shuffle input of Join operator.
  4. DynamicFilteringDataCollector collects the input data, removes the irrelevant column data and duplicated records, and then sends the records (wrapped in DynamicFilteringEvent) to its OperatorCoordinator once it collects all input data in finish method. The coordinator delivers the event to the SourceCoordinators of the relating fact-source operator, then the SourceCoordinator delivers the DynamicFilteringEvent to DynamicFileSplitEnumerator.
  5. The DynamicFileSplitEnumerator finds the the relevant partitions from the all partitions via the data from dim-source, and creates the target splits for fact-source.
  6. The JM schedules the job vertices for fact-source and Placeholder operator.
  7. The fact-source gets the splits from DynamicFileSplitEnumerator, reads the data, send the shuffle input of Join operator.
  8. The join operator reads the input data and does join operation.

New public interfaces

For filesystem connector and hive connector, the partition info only contains the specific partition value, while for lake storage, the partition info not only contains the specific partition value but also some statistics which could help doing more filtering. In the future we can even do dynamic filtering based on records. So we extend the concept from dynamic partition pruning to dynamic filtering for future extension.

SupportsDynamicFiltering is introduced to let the planner know whether a source connector supports dynamic filtering and let the source connector to filter the irrelevant partitions or input data in runtime. 


/**
* Push dynamic filter into {@link ScanTableSource}, the table source can filter the partitions or
* the input data in runtime to reduce scan I/O.
*/
@PublicEvolving
public interface SupportsDynamicFiltering {

  /**
* Returns the filter fields this partition table source supported. This method can tell the
* planner which fields can be used as dynamic filtering fields, the planner will pick some
* fields from the returned fields based on the query, and create dynamic filtering operator.
*/
List<String> listAcceptedFilterFields();

/**
* Applies the candidate filter fields into the table source. The data corresponding the filter
* fields will be provided in runtime, which can be used to filter the partitions or the input
* data.
*
* <p>NOTE: the candidate filter fields are always from the result of {@link
* #listAcceptedFilterFields()}.
*/
void applyDynamicFiltering(List<String> candidateFilterFields);
}


Since the split enumerator is executed in JM, the filtered partition data from dim-source operator should be send to the split enumerator via existing coordinator mechanism. So we introduce DynamicFilteringEvent to wrap the filtering data.

@PublicEvolving
public class DynamicFilteringEvent implements SourceEvent {
private final DynamicFilteringData data;

public DynamicFilteringEvent(DynamicFilteringData data) {
this.data = data;
}

public DynamicFilteringData getData() {
return data;
}

@Override
public String toString() {
return "DynamicFilteringEvent{" + "data=" + data + '}';
}
}


DynamicFilteringData wraps the values without the schema, the value order is consistent with the argument value of SupportsDynamicFiltering#applyDynamicFiltering method.

@PublicEvolving
public class DynamicFilteringData implements Serializable {
private final TypeInformation<RowData> typeInfo;
private final RowType rowType;
private final List<byte[]> serializedData;
private final boolean exceedThreshold;
private transient List<RowData> data;

public RowType getRowType() {
return rowType;
}

public Optional<List<RowData>> getData() {
if (exceedThreshold) {
return Optional.empty();
}

if (data == null) {
data = new ArrayList<>();
TypeSerializer<RowData> serializer = typeInfo.createSerializer(new ExecutionConfig());
for (byte[] bytes : serializedData) {
try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) {
RowData partition = serializer.deserialize(inView);
data.add(partition);
} catch (Exception e) {
throw new TableException("Unable to deserialize the value.", e);
}
}
}
return Optional.of(data);
}

public boolean contains(RowData row) {
// ...
}
}


Currently, for FileSystem connector and Hive connector, the splits are created by FileEnumerator. However, existing FileEnumerators do not match the requirement, therefore we introduce a new FileEnumerator named DynamicFileEnumerator to support creating splits based on partition.

@PublicEvolving
public interface DynamicFileEnumerator extends FileEnumerator {

void setDynamicFilteringData(DynamicFilteringData data);

/** Provider for DynamicFileEnumerator. */
@FunctionalInterface
interface Provider extends FileEnumerator.Provider {

DynamicFileEnumerator create();
}
}


We introduce a config option to enable/disable this feature, default is true.

@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
public static final ConfigOption<Boolean> TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED =
key("table.optimizer.dynamic-filtering.enabled")
.booleanType()
.defaultValue(true)
.withDescription(
"When it is true, the optimizer will try to push dynamic filtering into scan table source,"
+ " the irrelevant partitions or input data will be filtered to reduce scan I/O in runtime.");


We can find whether a query successfully applied the DPP optimization via EXPLAIN result. The following  snippets describes the explain result for the above query. (The optimized plan will contains DynamicFilteringDataCollector.)

== Abstract Syntax Tree ==
LogicalProject(sr_returned_date_sk=[$0], sr_return_time_sk=[$1], sr_item_sk=[$2], d_date_sk=[$3],d_date_id=[$4], d_year=[$5])
+- LogicalFilter(condition=[AND(=($0, $3), =($5, 2000))])
    +- LogicalJoin(condition=[true], joinType=[inner])
         :- LogicalTableScan(table=[[test-catalog, default, store_returns]])
         +- LogicalTableScan(table=[[test-catalog, default, date_dim]])

== Optimized Physical Plan ==
HashJoin(joinType=[InnerJoin], where=[=(sr_returned_date_sk, d_date_sk)], select=[sr_returned_date_sk, sr_return_time_sk, sr_item_sk, d_date_sk, d_date_id, d_year], build=[right])
:- Exchange(distribution=[hash[sr_returned_date_sk]])
:   +- DynamicFilteringTableSourceScan(table=[[test-catalog, default, store_returns]], fields=[sr_returned_date_sk, sr_return_time_sk, sr_item_sk])
:       +- DynamicFilteringDataCollector(fields=[d_date_sk])
:            +- Calc(select=[d_date_sk, d_date_id, d_year], where=[=(d_year, 2000)])
:                 +- TableSourceScan(table=[[test-catalog, default, date_dim]], fields=[d_date_sk, d_date_id, d_year])
+- Exchange(distribution=[hash[d_date_sk]])
    +- Calc(select=[d_date_sk, d_date_id, d_year], where=[=(d_year, 2000)])
         +- TableSourceScan(table=[[test-catalog, default, date_dim]], fields=[d_date_sk, d_date_id, d_year])

== Optimized Execution Plan ==
HashJoin(joinType=[InnerJoin], where=[=(sr_returned_date_sk, d_date_sk)], select=[sr_returned_date_sk, sr_return_time_sk, sr_item_sk, d_date_sk, d_date_id, d_year], build=[right])
:- Exchange(distribution=[hash[sr_returned_date_sk]])
:  +- TableSourceScan(table=[[test-catalog, default, store_returns]], fields=[sr_returned_date_sk, sr_return_time_sk, sr_item_sk])
:       +- DynamicFilteringDataCollector(fields=[d_date_sk])
:            +- Calc(select=[d_date_sk, d_date_id, d_year], where=[=(d_year, 2000)])(reuse_id=[1])
:                 +- TableSourceScan(table=[[test-catalog, default, date_dim]], fields=[d_date_sk, d_date_id, d_year])
+- Exchange(distribution=[hash[d_date_sk]])
     +- Reused(reference_id=[1])

Proposed Changes

Optimizer changes

TableScan defined in Calcite has no any inputs. Logically, the TableScan of fact table has a data dependence on the plan of dim table. We have two approaches to describe it.

  1. Introduce a new physical TableScan node with input. The planner rules should be introduced in physical rewrite phase to avoid the impact on other optimize phase, such as: cost computing for new TableScan in physical optimization with cost model.
    1. Pros: most work can be reused directly, such as: sub-plan reuse, transformation from physical node to ExecNode, etc.
    2. Cons: metadata handler work for new physical TableScan should be implemented.
  1. Introduce an attribute field to describe the sub-plan of dim table for TableScan. The planner rules should also be introduced in physical rewrite phase to avoid transformation from logical node to physical node.
    1. Pros: the definition of TableScan in Calcite need not be changed.
    2. Cons: The transformation for the sub-plan needs to be implemented again, and it hard to do sub-plan reuse (the dim table will be read twice).

We prefer the first approach. Now we can introduce a few planner rules in physical rewrite phase to detect and transform the plan, the DPP pattern can be defined as: An inner join with equal condition which field is partition column, one input is dim source with filter, and another input is fact source of partition table. After transformation, a new physical node named DynamicFilteringDataCollector will be introduced, its input is filter node and its output is fact table source node named DynamicFilteringTableSourceScan.

The following graph describes the plan before transformation and plan after transformation for the above query.



We do not need introduce new ExecNode for DynamicFilteringTableSourceScan, because the current mechanism has met our requirements, we just need to add input edges for Batch(Stream)ExecTableSourceScan node.

Scheduling deadlock

There is no data dependency between the fact-source operator and DynamicFilteringDataCollector operator, The order of scheduling between them is non-deterministic. A deadlock occurs if the fact-source operator is scheduled and waiting for DynamicFilteringEvent, but the DynamicFilteringDataCollector operator cannot be scheduled when resources are limited. To solve this problem, here are three options:

  1. Introduce data/control edges for source operator
    1. Pros:
      1. The optimization results are deterministic,
      2. users can visually see the data dependencies between them from the topology
    1. Cons: The change is too large
  1. Fact-source operator tries best to wait for DynamicFilteringEvent
    1. Pros: The solution is simple and clean.
    2. Cons:
      1. The optimization results are non-deterministic,
      2. Users can't visually see the data dependencies between them from the topology.
  1. Add a dependence operator (named Placeholder-Filter) which input is fact-source operator and DynamicFilteringDataCollector operator, this could ensure the DynamicFilteringDataCollector operator is scheduled before the fact-source operator . The dependency edge could be omitted if the fact source is chained with the join operator via the multiple-inputs tasks and there is already an edge between dim source and fact source. 
    1. Pros:
      1. The optimization results are deterministic
      2. the change is minimal
      3. users can see the data dependencies between them from the topology
    1. Cons: little hack, but do not effect execution

Currently, we prefer the last approach. In the long term, we prefer the first option.

The following graph describes the StreamGraph generated from the above query. 



DynamicFilteringDataCollector operator implementation

DynamicFilteringDataCollector operator collects the filtered dim source data, stores the relevant column data in memory buffer, the duplicated data will be removed. Once the memory threshold size (default is 32M) is exceeded, the data behind will be discarded, the 

DynamicFilteringData will also be marked as `exceedThreshold=true`, which means no partition or input data can be pruned. This could  avoid exceeding the akka's message size limit and avoid too much data being send to JM leading to OOM.

public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object>
implements OneInputStreamOperator<RowData, Object> {

private final RowType dynamicFilteringFieldType;
private final List<Integer> dynamicFilteringFieldIndices;
private final long threshold;
private transient long currentSize;
/** Use Set instead of List to ignore the duplicated records. */
private transient Set<byte[]> buffer;

private transient TypeInformation<RowData> typeInfo;
private transient TypeSerializer<RowData> serializer;
private transient boolean eventSend;

private final OperatorEventGateway operatorEventGateway;

@Override
public void processElement(StreamRecord<RowData> element) throws Exception {
if (currentSize > threshold) {
return;
}

GenericRowData rowData = new GenericRowData(dynamicFilteringFieldIndices.size());
// fill the rowData from the given element based on dynamicFilteringFieldIndices

// serialize the rowData
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);
serializer.serialize(rowData, wrapper);
boolean duplicated = !buffer.add(baos.toByteArray());
if (duplicated) {
return;
}
currentSize += baos.size();
if (exceedThreshold()) {
sendEvent();
}
}

private boolean exceedThreshold() {
return threshold > 0 && currentSize > threshold;
}

public void finish() throws Exception {
sendEvent();
}

private void sendEvent() {
if (eventSend) {
return;
}

final DynamicFilteringData dynamicFilteringData;
if (exceedThreshold()) {
dynamicFilteringData = new DynamicFilteringData(typeInfo, dynamicFilteringFieldType, new ArrayList<>(), true);
} else {
dynamicFilteringData = new DynamicFilteringData(typeInfo, dynamicFilteringFieldType, new ArrayList<>(buffer), false);
}

DynamicFilteringEvent event = new DynamicFilteringEvent(dynamicFilteringData);
operatorEventGateway.sendEventToCoordinator(new SourceEventWrapper(event));
this.eventSend = true;
}
}

Connector changes

The following graph lists the new introduced classes (marked as green color) for Hive connector to support DPP. FileSystem collector has similar class structure, I won't repeat it here.

  • DynamicFileSplitEnumerator will try it best to do partition pruning based on received DynamicFilteringEvent. If the split requests arrives before SourceEvent, the split assigner will also assign a split based on all partitions. Once the DynamicFilteringEvent arrives,  a new split assigner will be created, the only the  relevant split will be assigned for the following split requests. From the "Scheduling deadlock" section we can know, the dim-side operators will be scheduled first, this could make sure the DynamicFilteringEvent arrives DynamicFileSplitEnumerator before split requests from fact-source operator for most cases. If the placeholder operator can't be added and fact-source can't be chained with its output operator (such as: operator chain is explicitly disabled), this best-effort strategy can avoid scheduling deadlock. The pseudocode looks like:
@Internal
public class DynamicFileSplitEnumerator<SplitT extends FileSourceSplit>
implements SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>>,
SupportsHandleExecutionAttemptSourceEvent {

private final SplitEnumeratorContext<SplitT> context;

private final DynamicFileEnumerator.Provider fileEnumeratorFactory;

private final FileSplitAssigner.Provider splitAssignerFactory;
private transient FileSplitAssigner splitAssigner;
private final Set<SplitT> assignedSplits;

@Override
public void handleSplitRequest(int subtask, @Nullable String hostname) {
if (!context.registeredReaders().containsKey(subtask)) {
// reader failed between sending the request and now. skip this request.
return;
}
if (splitAssigner == null) {
createSplitAssigner(null);
}

final Optional<FileSourceSplit> nextSplit = getNextSplit(hostname);
if (nextSplit.isPresent()) {
final FileSourceSplit split = nextSplit.get();
context.assignSplit((SplitT) split, subtask);
assignedSplits.add((SplitT) split);
} else {
context.signalNoMoreSplits(subtask);
}
}

private Optional<FileSourceSplit> getNextSplit(String hostname) {
do {
final Optional<FileSourceSplit> nextSplit = splitAssigner.getNext(hostname);
if (nextSplit.isPresent()) {
// ignore the split if it has been assigned
if (!assignedSplits.contains(nextSplit.get())) {
return nextSplit;
}
} else {
return nextSplit;
}
} while (true);
}

@Override
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
if (sourceEvent instanceof DynamicFilteringEvent) {
createSplitAssigner(((DynamicFilteringEvent) sourceEvent).getData());
}
}

    @Override
public void handleSourceEvent(int subtaskId, int attemptNumber, SourceEvent sourceEvent) {
if (sourceEvent instanceof DynamicFilteringEvent) {
LOG.info("Received DynamicFilteringEvent: {}", subtaskId);
createSplitAssigner(((DynamicFilteringEvent) sourceEvent).getData());
}
}

private void createSplitAssigner(@Nullable DynamicFilteringData dynamicFilteringData) {
DynamicFileEnumerator fileEnumerator = fileEnumeratorFactory.create();
if (dynamicFilteringData != null) {
fileEnumerator.setDynamicFilteringData(dynamicFilteringData);
}
// create splits
splitAssigner = splitAssignerFactory.create(splits);
}
}


  • DynamicFileEnumerator will do the partition filter of all partitions based on DynamicFilteringData, the pseudocode for HiveSourceDynamicFileEnumerator looks like:
public class HiveSourceDynamicFileEnumerator implements DynamicFileEnumerator {
private final List<String> dynamicPartitionKeys;
// For non-partition hive table, partitions only contains one partition which partitionValues is empty.
private final List<HiveTablePartition> allPartitions;
private final int threadNum;
private final JobConf jobConf;
private transient List<HiveTablePartition> finalPartitions;

public void setDynamicFilteringData(DynamicFilteringData data) {
try {
finalPartitions = new ArrayList<>();
Optional<List<RowData>> receivedDataOpt = data.getData();
if (!receivedDataOpt.isPresent()) {
// the DynamicFilteringData is too large
finalPartitions = allPartitions;
return;
}
RowType rowType = data.getRowType();
for (HiveTablePartition partition : allPartitions) {
RowData rowData = createRowData(rowType, partition);
if (data.contains(rowData)) {
finalPartitions.add(partition);
}
}
} catch (Exception e) {
finalPartitions = allPartitions;
}
}

private RowData createRowData(RowType rowType, HiveTablePartition partition) {
// convert the partition to RowData based on the given row type
}

@Override
public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits) throws IOException {
return new ArrayList<>(createInputSplits(minDesiredSplits, finalPartitions, threadNum, jobConf));
}
}

Compatibility, Deprecation, and Migration Plan

This is new feature, no compatibility, deprecation, and migration plan.

Test Plan

  1. UT for planner changes
  2. IT for whole changes

Rejected Alternatives

Please refer to the various options above.



POC: https://github.com/godfreyhe/flink/tree/FLIP-248


[1] https://en.wikipedia.org/wiki/Star_schema