Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. The parser will parse a given query, convert to an AST (Abstract Syntax Tree) plan. The optimizer will detect and transform the plan with DPP pattern via planner rules, and get the optimized physical plan. The planner will generate the stream graph based on the physical plan.
  2. Submit the generated job graph to the job manager.
  3. The JM will schedule the job vertices for dim-source and DynamicPartitionCollector first. The filtered data from dim-source operator will be send both to DynamicPartitionCollector operator and the shuffle of Join operator.
  4. DynamicPartitionCollector collects the input data, removes the irrelevant column data and duplicated records, and then sends the partition data (wrapped in DynamicPartitionEvent) to its OperatorCoordinator once it collects all input data in finish method. The coordinator will deliver the event to the SourceCoordinators of the relating Fact sources, then the SourceCoordinators will deliver the DynamicPartitionEvent to DynamicFileSplitEnumerator.
  5. The DynamicFileSplitEnumerator finds the the relevant partitions from the all partitions via the partition data from dim-source, and creates the target splits for fact-source.
  6. The JM schedules the job vertices for fact-source.
  7. The fact-source gets the splits from DynamicFileSplitEnumerator, reads the data, send the shuffle of Join operator.
  8. The join operator reads the input data and does join operation.


New public interfaces

SupportsDynamicPartitionPruning` SupportsDynamicFiltering is introduce to let the planner know whether a source connector supports DPP dynamic filtering and let the source connector to choose the static split enumerator or dynamic split enumerator.

@PublicEvolving
public interface SupportsDynamicPartitionPruning {

    void applyDynamicPartitionPruning(List<String> partitionKeys);
}

Since the split enumerator is executed in JM, the filtered partition data from dim-source operator should be send to the split enumerator via existing coordinator mechanism. So we introduce DynamicPartitionEvent to wrap the partition data.

...

filter the irrelevant partitions or input data in runtime.

/**
* Push dynamic filter into {@link ScanTableSource}, the table source can filter the partitions or
* the input data in runtime to reduce scan I/O.
*/
@PublicEvolving
public interface SupportsDynamicFiltering {

/**
* apply the candidate filter fields into the table source, and return the accepted fields. The
* data corresponding the filter fields will be provided in runtime, which can be used to filter
* partitions and input data.
*/
List<String> applyDynamicFiltering(List<String> candidateFilterFields);
}


Since the split enumerator is executed in JM, the filtered partition data from dim-source operator should be send to the split enumerator via existing coordinator mechanism. So we introduce DynamicPartitionEvent to wrap the partition data.

@PublicEvolving
public class DynamicFilteringEvent implements SourceEvent {
private final DynamicFilteringData data;

public DynamicFilteringEvent(DynamicFilteringData data) {
this.data = data;
}

public DynamicFilteringData getData() {
return data;
}

@Override
public String toString() {
return "DynamicFilteringEvent{" + "data=" + data + '}';
}
}


PartitionData wraps the partition values without the schema, the value order is consistent with the argument value of SupportsDynamicPartitionPruning#applyDynamicPartitionPruning method.

@PublicEvolving
public class DynamicFilteringData implements Serializable {
private final TypeInformation<RowData> typeInfo;
private final RowType rowType;
private final List<byte[]> serializedData;
private final boolean exceedThreshold;
private transient List<RowData> data;

public RowType getRowType() {
return rowType;
}

public Optional<List<RowData>> getData() {
if (exceedThreshold) {
return Optional.empty();
}

if (data == null) {
data = new ArrayList<>();
TypeSerializer<RowData> serializer = typeInfo.createSerializer(new ExecutionConfig());
for (byte[] bytes : serializedData) {
try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) {

...

 

...

 

...

    

...

    

...

 

...

 

...

RowData partition = serializer.deserialize(inView);

...

 

...

    

...

   

...

    

...

 

...

 

...

data.add(partition);

...

        

...

 

...

 

...

 

...

 

...

 

...

 

...

 

...

}

...

 catch (Exception 

...

PartitionData wraps the partition values without the schema, the value order is consistent with the argument value of SupportsDynamicPartitionPruning#applyDynamicPartitionPruning method.

...

e) {

...

 

...

 

...

 

...

   throw 

...

new 

...

TableException("Unable to deserialize the value.", e);
}

...

 

...

 

...

    }

...


...

 

...

 

...

 

...

 

...

}
return

...

Optional.

...

of(

...

data);

...


}

...



...

   

...

public 

...

boolean 

...

contains(RowData row) {

...


...

 

...

 

...

 

...

// ...
}

...


}


Currently, for FileSystem connector and Hive connector, the splits are created by FileEnumerator. However, existing FileEnumerators do not match the requirement, therefore we introduce a new FileEnumerator named DynamicFileEnumerator to support creating splits based on partition.

@PublicEvolving

...


public interface DynamicFileEnumerator extends FileEnumerator {

...



void

...

setDynamicFilteringData(

...

DynamicFilteringData 

...

data);

...



/** Provider for DynamicFileEnumerator. */

...


@FunctionalInterface

...


interface Provider extends FileEnumerator.Provider {

...



...

  

...

DynamicFileEnumerator create();

...


}

...


}


We introduce a config option to enable/disable this feature, default is true.

...

@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING

...

)
public static final ConfigOption<Boolean> TABLE_OPTIMIZER_DYNAMIC_

...

FILTERING_ENABLED =

...


key("table.optimizer.dynamic-

...

filtering.enabled")

...


.booleanType()
.

...

defaultValue(

...

true)
.withDescription(

...

   "When it is true, the optimizer will try to push dynamic filtering into scan table source,"

...

                            

...

 

...

 

...

+ 

...

" the 

...

irrelevant 

...

partitions 

...

or 

...

input 

...

data 

...

will 

...

be 

...

filtered to reduce scan 

...

I/O in runtime.");

transformation

Proposed Changes

...