Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Since the split enumerator is executed in JM, the filtered partition data from dim-source operator should be send to the split enumerator via existing coordinator mechanism. So we introduce DynamicPartitionEvent DynamicFilteringEvent to wrap the partition filtering data.

@PublicEvolving
public class DynamicFilteringEvent implements SourceEvent {
private final DynamicFilteringData data;

public DynamicFilteringEvent(DynamicFilteringData data) {
this.data = data;
}

public DynamicFilteringData getData() {
return data;
}

@Override
public String toString() {
return "DynamicFilteringEvent{" + "data=" + data + '}';
}
}


PartitionDataDynamicFilteringData wraps the partition values without the schema, the value order is consistent with the argument value of SupportsDynamicPartitionPruning#applyDynamicPartitionPruning SupportsDynamicFiltering#applyDynamicFiltering method.

@PublicEvolving
public class DynamicFilteringData implements Serializable {
private final TypeInformation<RowData> typeInfo;
private final RowType rowType;
private final List<byte[]> serializedData;
private final boolean exceedThreshold;
private transient List<RowData> data;

public RowType getRowType() {
return rowType;
}

public Optional<List<RowData>> getData() {
if (exceedThreshold) {
return Optional.empty();
}

if (data == null) {
data = new ArrayList<>();
TypeSerializer<RowData> serializer = typeInfo.createSerializer(new ExecutionConfig());
for (byte[] bytes : serializedData) {
try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) {
RowData partition = serializer.deserialize(inView);
data.add(partition);
} catch (Exception e) {
throw new TableException("Unable to deserialize the value.", e);
}
}
}
return Optional.of(data);
}

public boolean contains(RowData row) {
// ...
}
}


Currently, for FileSystem connector and Hive connector, the splits are created by FileEnumerator. However, existing FileEnumerators do not match the requirement, therefore we introduce a new FileEnumerator named DynamicFileEnumerator to support creating splits based on partition.

...