Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently, the Flink source operator does not have any input edge, and the splits are generated in SplitEnumerator for new source (FLIP-27). Based on the current mechanism, the dynamic partition pruning will happen in SplitEnumerator (such as: filesystem connector or hive connector). I in the future the source operator supports input, the input data could support dynamic filtering (such as: filter row-group for parquet based on statistics, or even filter records based on bloom filter).

In this FLIP we will introduce a mechanism for detecting dynamic partition pruning patterns in optimization phase and performing partition pruning at runtime by sending the dimension table results to the SplitEnumerator of fact table via existing coordinator existing coordinator mechanism.

In above example, the result from date_dim which d_year is 2000 will be send to join build side and the Coordinator, and the SplitEnumerator of store_returns will filter the partitions based on the filtered date_dim data, and send the real required splits to the source scan of store_returns.

...

  1. The parser parses a given query, convert to an AST (Abstract Syntax Tree) plan. The optimizer will detect with DPP pattern and transform the plan to via planner rules, and then the optimized physical plan will be converted to ExecNode plan. The ExecNode plan will be converted to StreamingGraph and then JobGraph.
  2. The client submits the generated JobGraph to the job manager.
  3. The JM schedules the job vertices for dim-source and DynamicFilteringDataCollector first. The filtered data from dim-source operator will be send both to DynamicFilteringDataCollector operator and the shuffle input of Join operator.
  4. DynamicFilteringDataCollector collects the input data, removes the irrelevant column data and duplicated records, and then sends the records (wrapped in DynamicFilteringEvent) to its OperatorCoordinator once it collects all input data in finish method. The coordinator delivers the event to the SourceCoordinators of the relating fact-source operator, then the SourceCoordinator delivers the DynamicFilteringEvent to DynamicFileSplitEnumerator.
  5. The DynamicFileSplitEnumerator finds the the relevant partitions from the all partitions via the data from dim-source, and creates the target splits for fact-source.
  6. The JM schedules the job vertices for fact-source and Placeholder operator.
  7. The fact-source gets the splits from DynamicFileSplitEnumerator, reads the data, send the shuffle input of Join operator.
  8. The join operator reads the input data and does join operation.

Image Modified

New public interfaces

...

      1. The optimization results are non-deterministic,
      2. Users can't visually see the data dependencies between them from the topology.
  1. Add a dependence operator (named Placeholder-Filter) which input is fact-source operator and DynamicFilteringDataCollector operator, this could ensure the DynamicFilteringDataCollector operator is scheduled before the fact-source operator . The dependency edge could be omitted if the fact source is chained with the join operator via the multiple-inputs tasks and there is already an edge between dim source and fact source. 

...

The following graph describes the plan before transformation and plan after transformation for StreamGraph generated from the above query. 

Connector changes

The following graph lists the new introduced classes (marked as green color) for Hive connector to support DPP. FileSystem collector has similar class structure, I won't repeat it here.

Image Removed

  1. DynamicFileSplitEnumerator will handle the SourceEvent from SourceCoordinator and get the PartitionData from SourceEvent if it's a DynamicFilteringEvent, create the DynamicFileEnumerator to get the relevant partitions based on PartitionData. All split requests will be add to buffer to wait for the DynamicFilteringEvent to be handled and then start to handle the awaiting requests. The pseudocode looks like:

...



DynamicFilteringDataCollector operator implementation

DynamicFilteringDataCollector operator collects the filtered dim source data, stores the relevant column data in memory buffer, the duplicated data will be removed. Once the memory threshold size (default is 32M) is exceeded, the data behind will be discarded, the 

DynamicFilteringData will also be marked as `exceedThreshold=true`, which means no partition or input data can be pruned. This could  avoid exceeding the akka's message size limit and avoid too much data being send to JM leading to OOM.

public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object>
implements OneInputStreamOperator<RowData, Object> {

private final RowType dynamicFilteringFieldType;
private final List<Integer> dynamicFilteringFieldIndices;
private final long threshold;
private transient long currentSize;
/** Use Set instead of List to ignore the duplicated records. */
private transient Set<byte[]> buffer;

private transient TypeInformation<RowData> typeInfo;
private transient TypeSerializer<RowData> serializer;
private transient boolean eventSend;

private final OperatorEventGateway operatorEventGateway;

@Override
public void processElement(StreamRecord<RowData> element) throws Exception {
if (currentSize > threshold) {
return;
}

GenericRowData rowData = new GenericRowData(dynamicFilteringFieldIndices.size());
// fill the rowData from the given element based on dynamicFilteringFieldIndices

// serialize the rowData
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);
serializer.serialize(rowData, wrapper);
boolean duplicated = !buffer.add(baos.toByteArray());
if (duplicated) {
return;
}
currentSize += baos.size();
if (exceedThreshold()) {
sendEvent();
}
}

private boolean exceedThreshold() {
return threshold > 0 && currentSize > threshold;
}

public void finish() throws Exception {
sendEvent();
}

private void sendEvent() {
if (eventSend) {
return;
}

final DynamicFilteringData dynamicFilteringData;
if (exceedThreshold()) {
dynamicFilteringData = new DynamicFilteringData(typeInfo, dynamicFilteringFieldType, new ArrayList<>(), true);
} else {
dynamicFilteringData = new DynamicFilteringData(typeInfo, dynamicFilteringFieldType, new ArrayList<>(buffer), false);
}

DynamicFilteringEvent event = new DynamicFilteringEvent(dynamicFilteringData);
operatorEventGateway.sendEventToCoordinator(new SourceEventWrapper(event));
this.eventSend = true;
}
}

Connector changes

The following graph lists the new introduced classes (marked as green color) for Hive connector to support DPP. FileSystem collector has similar class structure, I won't repeat it here.

Image Added

  • DynamicFileSplitEnumerator will try it best to do partition pruning based on received DynamicFilteringEvent. If the split requests arrives before SourceEvent, the split assigner will also assign a split based on all partitions. Once the DynamicFilteringEvent arrives,  a new split assigner will be created, the only the  relevant split will be assigned for the following split requests. From the "Scheduling deadlock" section we can know, the dim-side operators will be scheduled first, this could make sure the DynamicFilteringEvent arrives DynamicFileSplitEnumerator before split requests from fact-source operator for most cases. If the placeholder operator can't be added and fact-source can't be chained with its output operator (such as: operator chain is explicitly disabled), this best-effort strategy can avoid scheduling deadlock. The pseudocode looks like:
@Internal
public class DynamicFileSplitEnumerator<SplitT extends FileSourceSplit>
implements SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> {

private final SplitEnumeratorContext<SplitT> context;

private final DynamicFileEnumerator.Provider fileEnumeratorFactory;

private final FileSplitAssigner.Provider splitAssignerFactory;
private transient FileSplitAssigner splitAssigner;
private final Set<SplitT> assignedSplits;

@Override
public void handleSplitRequest(int subtask, @Nullable String hostname) {
if (!context.registeredReaders().containsKey(subtask)) {
// reader failed between sending the request and now. skip this request.
return;
}
if (splitAssigner == null) {
createSplitAssigner(null);
}

final Optional<FileSourceSplit> nextSplit = getNextSplit(hostname);
if (nextSplit.isPresent()) {
final FileSourceSplit split = nextSplit.get();
context.assignSplit((SplitT) split, subtask);
assignedSplits.add((SplitT) split);
} else {
context.signalNoMoreSplits(subtask);
}
}

private Optional<FileSourceSplit> getNextSplit(String hostname) {
do {
final Optional<FileSourceSplit> nextSplit = splitAssigner.getNext(hostname);
if (nextSplit.isPresent()) {
// ignore the split if it has been assigned
if (!assignedSplits.contains(nextSplit.get())) {
return nextSplit;
}
} else {
return nextSplit;
}

...


...

} 

...

while (true);
}

@Override

...


public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {

...


if (sourceEvent instanceof DynamicFilteringEvent) {

...

 createSplitAssigner(((DynamicFilteringEvent) sourceEvent).getData());
}
}

...

 

...

private 

...

void 

...

createSplitAssigner(@Nullable DynamicFilteringData dynamicFilteringData) {
DynamicFileEnumerator fileEnumerator

...

 = fileEnumeratorFactory.create();
if

...

(dynamicFilteringData != null) {

...

 

...

fileEnumerator.

...

setDynamicFilteringData(

...

dynamicFilteringData);

...


}

...

  

...

 

...

 // create splits
splitAssigner

...

= splitAssignerFactory.create(splits);

...


}
}


  • DynamicFileEnumerator will do the partition filter of all partitions based on DynamicFilteringData, the pseudocode for HiveSourceDynamicFileEnumerator looks like:
public class HiveSourceDynamicFileEnumerator implements 

...

DynamicFileEnumerator 

...

{

...


private final List<String> dynamicPartitionKeys;

...

// For non-partition hive table, partitions only contains one partition which partitionValues is empty.
private final List<HiveTablePartition> allPartitions;
private

...

final 

...

int threadNum;
private final JobConf jobConf;

...

 

...

  private transient 

...

List<HiveTablePartition> finalPartitions;

...

 public void 

...

setDynamicFilteringData(DynamicFilteringData data) {

...

 

...

 

...

 

...

 

...

 

...

try {

...


...

 

...

finalPartitions 

...

= new 

...

ArrayList<>();

...

           Optional<List<RowData>> 

...

receivedDataOpt 

...

= 

...

data.getData();

...


...

if (!receivedDataOpt.isPresent()) {

...

    

...

    

...

// the DynamicFilteringData is 

...

too 

...

large

...

 

...

 

...

        

...

 

...

 

...

 

...

 

...

 finalPartitions = allPartitions;

...

 

...

      return;

...

    

...

  1. DynamicFileEnumerator will do the partition filter of all partitions based on PartitionData, the pseudocode looks like:

...

      }

...

RowType 

...

rowType 

...

= data.getRowType();

...

 

...

 

...

 

...

 

...

   for 

...

(HiveTablePartition 

...

partition 

...

: 

...

allPartitions) {

...


...

 

...

 

...

 

...

      RowData rowData 

...

= createRowData(

...

rowType, partition

...

);

...

 

...

 

...

if 

...

(data.

...

contains(rowData)

...

) {
finalPartitions.add(partition);

...

 

...

 

...

      }
}
} catch

...

(Exception e) {
finalPartitions = allPartitions;

...

 

...

 

...

   }
}

private RowData createRowData(RowType rowType, HiveTablePartition

...

partition)

...

 {
// convert the

...

partition to RowData based on the given row 

...

type
}

...



@Override

...


public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits) throws IOException {

...


return new ArrayList<>(createInputSplits(minDesiredSplits, finalPartitions

...

, threadNum, jobConf));

...


}

}

Compatibility, Deprecation, and Migration Plan

...