You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

Status

Current state: "Under Discussion"

Discussion thread

JIRA: here (<- link to )

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In the field of data analysis, star-schema[1] is the simplest style of and most widely used data mart patterns. It's natural in a star schema to map one or more dimensions to partition columns. Detecting and avoiding unnecessary data scans is the most important responsibility of the optimizer. Currently, Flink supports static partition pruning: the conditions in the WHERE clause are analyzed to determine in advance which partitions can be safely skipped in the optimization phase. Another common scenario: the partitions information is not available in the optimization phase but in the execution phase. That's the problem this FLIP is trying to solve: dynamic partition pruning, which could reduce the partition table source IO.

Consider a star-schema which consists of one or multiple fact tables referencing any number of dimension tables with partition key columns from fact tables. In such join queries, the partitions in fact table can be pruned through the result from the filtering the dimension tables. The following is the simplest example which shows the dynamic partition pruning could work.

select * from store_returns, date_dim
where sr_returned_date_sk = d_date_sk
and d_year = 2000


In this FLIP we will introduce a mechanism for detecting dynamic partition pruning patterns in optimization phase and performing partition pruning at runtime by sending the dimension table results to the SplitEnumerator of fact table via existing coordinator mechanism.

In above example, the result from date_dim which d_year is 2000 will be send to join build side and the Coordinator, and the SplitEnumerator of store_returns will filter the partitions based on the filtered date_dim data, and send the real required splits to the source scan of store_returns.

Dynamic partition pruning mechanism can improve performance by avoiding reading large amounts of irrelevant data, and it works for both batch and streaming queries. Here we refer to dynamic partition pruning as DPP.

Public Interfaces

The whole workflow for DPP

The following graph shows the optimization and execution process of a query using DPP mechanism for FileSystem connector and Hive connector.

  1. The parser will parse a given query, convert to an AST (Abstract Syntax Tree) plan. The optimizer will detect and transform the plan with DPP pattern via planner rules, and get the optimized physical plan. The planner will generate the stream graph based on the physical plan.
  2. Submit the generated job graph to the job manager.
  3. The JM will schedule the job vertices for dim-source and DynamicPartitionCollector first. The filtered data from dim-source operator will be send both to DynamicPartitionCollector operator and the shuffle of Join operator.
  4. DynamicPartitionCollector collects the input data, removes the irrelevant column data and duplicated records, and then sends the partition data (wrapped in DynamicPartitionEvent) to the SourceCoordinator of Fact source once it collects all input data in finish method. The SourceCoordinator will deliver the DynamicPartitionEvent to DynamicFileSplitEnumerator.
  5. The DynamicFileSplitEnumerator finds the the relevant partitions from the all partitions via the partition data from dim-source, and creates the target splits for fact-source.
  6. The JM schedules the job vertices for fact-source.
  7. The fact-source gets the splits from DynamicFileSplitEnumerator, reads the data, send the shuffle of Join operator.
  8. The join operator reads the input data and does join operation.


New public interfaces

SupportsDynamicPartitionPruning` is introduce to let the planner know whether a source connector supports DPP and let the source connector to choose the static split enumerator or dynamic split enumerator.

@PublicEvolving
public interface SupportsDynamicPartitionPruning {

    void applyDynamicPartitionPruning(List<String> partitionKeys);
}


Since the split enumerator is executed in JM, the filtered partition data from dim-source operator should be send to the split enumerator via existing coordinator mechanism. So we introduce DynamicPartitionEvent to wrap the partition data.

@PublicEvolving
public class DynamicPartitionEvent implements SourceEvent {
    private final PartitionData data;

    public DynamicPartitionEvent(PartitionData data) {
        this.data = data;
    }

    public PartitionData getPartitionData() {
        return data;
    }

    @Override
    public String toString() {
        return "DynamicPartitionEvent{" + "data=" + data + '}';
    }
}


PartitionData wraps the partition values without the schema, the value order is consistent with the argument value of SupportsDynamicPartitionPruning#applyDynamicPartitionPruning method.

@PublicEvolving
public class PartitionData implements Serializable {
    private final List<Row> partitions;

    public PartitionData(List<Row> partitions) {
        this.partitions = partitions;
    }

    public boolean contains(Row partition) {
        return partitions.contains(partition);
    }
    
    @Override
    public String toString() {
        return "PartitionData{" + "partitions=" + partitions + '}';
    }
}


Currently, for FileSystem connector and Hive connector, the splits are created by FileEnumerator. However, existing FileEnumerators do not match the requirement, therefore we introduce a new FileEnumerator named DynamicFileEnumerator to support creating splits based on partition.

@PublicEvolving
public interface DynamicFileEnumerator extends FileEnumerator {
    
    void setPartitionData(PartitionData partitionData);
    
    /** Provider for DynamicFileEnumerator. */
    @FunctionalInterface
    interface Provider extends FileEnumerator.Provider {
        
        DynamicFileEnumerator create();
    }
}


We introduce a config option to enable/disable this feature, default is true.

 @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
    public static final ConfigOption<Boolean> TABLE_OPTIMIZER_DYNAMIC_PARTITION_PRUNING_ENABLED =
            key("table.optimizer.dynamic-partition-pruning.enabled")
                    .booleanType()
                    .defaultValue(true)
                    .withDescription(
                            "When it is true, the optimizer will try to filter the irrelevant partitions to reduce scan IO.");

transformation

Proposed Changes

Optimizer changes

TableScan defined in Calcite has no any inputs. Logically, the TableScan of fact table has a data dependence on the plan of dim table. We have two approaches to describe it.

  1. Introduce a new physical TableScan node with input. The planner rules should be introduced in physical rewrite phase to avoid the impact on other optimize phase, such as: cost computing for new TableScan in physical optimization with cost model.
    1. Pros: most work can be reused directly, such as: sub-plan reuse, transformation from physical node to ExecNode, etc.
    2. Cons: metadata handler work for new physical TableScan should be implemented.
  1. Introduce an attribute field to describe the sub-plan of dim table for TableScan. The planner rules should also be introduced in physical rewrite phase to avoid transformation from logical node to physical node.
    1. Pros: the definition of TableScan in Calcite need not be changed.
    2. Cons: The transformation for the sub-plan needs to be implemented again, and it hard to do sub-plan reuse (the dim table will be read twice).

We prefer the first approach. Now we can introduce a few planner rules in physical rewrite phase to detect and transform the plan, the DPP pattern can be defined as: An inner join with equal condition which field is partition column, one input is dim source with filter, and another input is fact source of partition table. After transformation, a new physical node named DynamicPartitionCollector will be introduced, its input is filter node and its output is fact table source node named DynamicPartitionTableScan.

Currently, we do not introduce threshold for the size of partition data, because it depends on the statistics.

If the statistics is not available, this feature is also not available. We can improve it once we introduce file size statistics which is more accessible and more reliable.

The following graph describes the plan before transformation and plan after transformation for the query.

select * from store_returns, date_dim
where sr_returned_date_sk = d_date_sk
and d_year = 2000


We do not need introduce new ExecNode for DynamicParttionTableScan, because the current mechanism has met our requirements, we just need to add input edges for Batch(Stream)ExecTableSourceScan node.

Build StreamGraph

Currently, the runtime operator for source has no input. The DynamicPartitionCollector can only connect with the SplitEnumerator of DynamicTableSource. So the operator of DynamicPartitionCollector needs to know the operator id of DynamicTableSource to get corresponding OperatorEventGateway. But, the operator id is generated in StreamingJobGraphGenerator. We will introduce a new mechanism which allows operators to communicate with each other by user defined operator indentifier. This will be introduced in another FLIP (This part will be updated later).

Currently, the planner will recursively traverse from sink to source, and create transformations from source to sink. The planner only know the sink transformations. While DynamicPartitionCollector is not a real sink for planner, and we can find the transformation of DynamicPartitionCollector from sink transformations, because source transformation has no input. So we should register the transformations of DynamicPartitionCollector to the planner separately, and build the StreamGraph with all transformations.

Connector changes

The following graph lists the new introduced classes (marked as green color) for Hive connector to support DPP. FileSystem collector has similar class structure, I won't repeat it here.

  1. DynamicFileSplitEnumerator will handle the SourceEvent from SourceCoordinator and get the PartitionData from SourceEvent if it's a DynamicPartitionEvent, create the DynamicFileEnumerator to get the relevant partitions based on PartitionData. All split requests will be add to buffer to wait for the DynamicPartitionEvent to be handled and then start to handle the awaiting requests. The pseudocode looks like:
public class DynamicFileSplitEnumerator<SplitT extends FileSourceSplit>
        implements SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> {

    private final DynamicFileEnumerator.Provider fileEnumeratorFactory;
    private final FileSplitAssigner.Provider splitAssignerFactory;
    private transient FileSplitAssigner splitAssigner;
    /** the buffer the requests before the partition data is received. */
    private final LinkedHashMap<Integer, String> readersAwaitingSplit;
    /** A flag which indicates whether the PartitionData is received. */
    private transient boolean partitionDataReceived;

    @Override
    public void handleSplitRequest(int subtask, @Nullable String hostname) {
        if (!partitionDataReceived) {
            readersAwaitingSplit.put(subtask, hostname);
            return;
        }
        // handle the request normally
    }

    @Override
    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
        if (sourceEvent instanceof DynamicPartitionEvent) {
            DynamicFileEnumerator fileEnumerator = fileEnumeratorFactory.create();
            fileEnumerator.setPartitionData(((DynamicPartitionEvent) sourceEvent).getData());
            Collection<FileSourceSplit> splits = fileEnumerator.enumerateSplits(new Path[1], context.currentParallelism());
            splitAssigner = splitAssignerFactory.create(splits);
            this.partitionDataReceived = true;
            assignAwaitingRequests();
        } else {
            LOG.error("Received unrecognized event: {}", sourceEvent);
        }
    }

    private void assignAwaitingRequests() {
        // ...
    }
    
    @Override
    public void addSplitsBack(List<SplitT> splits, int subtaskId) {
        if (splitAssigner != null) {
            List<FileSourceSplit> fileSplits = new ArrayList<>(splits);
            splitAssigner.addSplits(fileSplits);
        }
    }

    @Override
    public PendingSplitsCheckpoint<SplitT> snapshotState(long checkpointId) {
        if (splitAssigner != null) {
            return PendingSplitsCheckpoint.fromCollectionSnapshot(splitAssigner.remainingSplits());
        }
    }
}


  1. DynamicFileEnumerator will do the partition filter of all partitions based on PartitionData, the pseudocode looks like:
public class HiveSourceDynamicFileEnumerator implements DynamicFileEnumerator {
    private final List<HiveTablePartition> allPartitions;
    private transient List<HiveTablePartition> finalPartitions;
 
    public void setPartitionData(PartitionData partitionData) {
        finalPartitions = new ArrayList<>();
        for (HiveTablePartition partition : allPartitions) {
            Object[] values = dynamicPartitionKeys.stream()
                            .map(k -> partition.getPartitionSpec().get(k))
                            .toArray(String[]::new);
            if (partitionData.contains(Row.of(values))) {
                finalPartitions.add(partition);
            }
        }
    }

    @Override
    public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits) throws IOException {
        return new ArrayList<>(createInputSplits(minDesiredSplits, finalPartitions ...));
    }
}

Scheduling deadlock

There is no data dependency between the fact-source operator and DynamicPartitionCollector operator, The order of scheduling between them is non-deterministic. A deadlock occurs if the fact-source operator is scheduled and waiting for DynamicPartitionEvent, but the DynamicPartitionCollector operator cannot be scheduled when resources are limited. To solve this problem, here are three options:

  1. Introduce data/control edges for source operator
    1. Pros:
      1. The optimization results are deterministic,
      2. users can visually see the data dependencies between them from the topology
    1. Cons: The change is too large
  1. Fact-source operator tries best to wait for DynamicPartitionEvent
    1. Pros: The solution is simple and clean.
    2. Cons:
      1. The optimization results are non-deterministic,
      2. Users can't visually see the data dependencies between them from the topology.
  1. Add a dependence operator which input is fact-source operator and DynamicPartitionCollector operator, this could ensure the DynamicPartitionCollector operator is scheduled before the fact-source operator .
    1. Pros:
      1. The optimization results are deterministic
      2. the change is minimal
      3. users can see the data dependencies between them from the topology
    1. Cons: little hack, but do not effect execution

Currently, we prefer the last approach. In the long term, we prefer the first option.

Compatibility, Deprecation, and Migration Plan

This is new feature, no compatibility, deprecation, and migration plan.

Test Plan

  1. UT for planner changes
  2. IT for whole changes

Rejected Alternatives

Please refer to the various options above.



POC: https://github.com/godfreyhe/flink/tree/FLIP-248


[1] https://en.wikipedia.org/wiki/Star_schema



  • No labels