...
Since the split enumerator is executed in JM, the filtered partition data from dim-source operator should be send to the split enumerator via existing coordinator mechanism. So we introduce DynamicPartitionEvent
DynamicFilteringEvent
to wrap the partition filtering data.
@PublicEvolving
public class DynamicFilteringEvent implements SourceEvent {
private final DynamicFilteringData data;
public DynamicFilteringEvent(DynamicFilteringData data) {
this.data = data;
}
public DynamicFilteringData getData() {
return data;
}
@Override
public String toString() {
return "DynamicFilteringEvent{" + "data=" + data + '}';
}
}
PartitionData
DynamicFilteringData wraps the partition values without the schema, the value order is consistent with the argument value of SupportsDynamicPartitionPruning#applyDynamicPartitionPruning
SupportsDynamicFiltering#applyDynamicFiltering
method.
@PublicEvolving
public class DynamicFilteringData implements Serializable {
private final TypeInformation<RowData> typeInfo;
private final RowType rowType;
private final List<byte[]> serializedData;
private final boolean exceedThreshold;
private transient List<RowData> data;
public RowType getRowType() {
return rowType;
}
public Optional<List<RowData>> getData() {
if (exceedThreshold) {
return Optional.empty();
}
if (data == null) {
data = new ArrayList<>();
TypeSerializer<RowData> serializer = typeInfo.createSerializer(new ExecutionConfig());
for (byte[] bytes : serializedData) {
try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) {
RowData partition = serializer.deserialize(inView);
data.add(partition);
} catch (Exception e) {
throw new TableException("Unable to deserialize the value.", e);
}
}
}
return Optional.of(data);
}
public boolean contains(RowData row) {
// ...
}
}
Currently, for FileSystem connector and Hive connector, the splits are created by FileEnumerator
. However, existing FileEnumerators do not match the requirement, therefore we introduce a new FileEnumerator
named DynamicFileEnumerator
to support creating splits based on partition.
...