Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The main purpose of this FLIP is to disucss the second approache. Compared to the first approache, the second one is to get statistics in real time, no need to run analyze job for each table. This could help improve the user experience. The disadvantage is, in most cases, the statistics reported by connector is not as complete as the results of analyze job. We will also introduce the "ANALYZE TABLE" syntax in other FLIP.

Public Interfaces


SupportStatisticReport SupportsStatisticReport is an interface that allows the Connector to report statistics to the planner. The statistics reported by Connector will be used when the statistics from Catalog is unknown.

/** Enables to report the estimated statistics provided by the {@link DynamicTableSource}. */
@PublicEvolving
public interface SupportStatisticReportSupportsStatisticReport {

    /**
     * Returns the estimated statistics of this {@link DynamicTableSource}, else {@link
     * TableStats#UNKNOWN} if some situations are not supported or cannot be handled.
     */
    TableStats reportStatistics();
}

...

public static final ConfigOption<Boolean> TABLE_OPTIMIZER_SOURCE_CONNECTCOLLECT_STATISTICS_ENABLED =
    key("table.optimizer.source.connectcollect-statistics-enabled")
        .booleanType()
        .defaultValue(true)
        .withDescription(
            "When it is true, the optimizer will connectcollect and use the statistics from source connector"
            + " if the source extends from SupportStatisticReportSupportsStatisticReport and the connectedcollected statistics is not UNKNOWN."
            + "Default value is true.");

...

public static final ConfigOption<FileStatisticsType> SOURCE_STATISTICS_TYPE =
    key("source.statistics-type")
        .enumType(FileStatisticsType.class)
        .defaultValue(FileStatisticsType.ALL)
        .withDescription("The file statistics type which the source could provide. "
            + "The statistics collecting is a heavy operation in some cases,"
            + "this config allows users to choose the statistics type according to different situations.");

public enum FileStatisticsType implements DescribedEnum {
    NONE("NONE", text("Do not collect any file statistics.")),
    ALL("ALL", text("Collect all file statistics that the format can provide."));

    private final String value;
    private final InlineElement description;

    FileStatisticsType(String value, InlineElement description) {
        this.value = value;
        this.description = description;
    }

    @Override
    public String toString() {
        return value;
    }

    @Override
    public InlineElement getDescription() {
        return description;
    }
}

Proposed Changes

  1. How the planner use the statistics reported by connector?

The statistics for a table needs to be re-computed when:

...

The pseudocode is as follows:

public

...

class

...

FlinkRecomputeStatisticsProgram implements

...

FlinkOptimizeProgram<BatchOptimizeContext>

...

{

    @Override
    public RelNode optimize(RelNode

...

root,

...

BatchOptimizeContext

...

context)

...

{
        //

...

create

...

a

...

visitor

...

to

...

find

...

all

...

LogicalTableScan

...

nodes
        //

...

call

...

recomputeStatistics method

...

for

...

each

...

LogicalTableScan
    }

    private LogicalTableScan recomputeStatistics(LogicalTableScan scan) {
        final RelOptTable scanTable = scan.getTable();

...


        if (!(scanTable

...

instanceof

...

TableSourceTable))

...

{
            return scan;
        }
         boolean reportStatEnabled =
                ShortcutUtils.unwrapContext(scan)
                                .getTableConfig()
                                .get(TABLE_OPTIMIZER_SOURCE_COLLECT_STATISTICS_ENABLED)

...


                        && table.tableSource()

...

instanceof SupportsStatisticReport;

        SourceAbilitySpec[]

...

specs

...

=

...

table.abilitySpecs();

...


        PartitionPushDownSpec partitionPushDownSpec = getSpec(specs, PartitionPushDownSpec.class);
        FilterPushDownSpec filterPushDownSpec = getSpec(specs, FilterPushDownSpec.class);
        TableStats newTableStat =
                recomputeStatistics(
                        table, partitionPushDownSpec, filterPushDownSpec, reportStatEnabled);
        FlinkStatistic newStatistic =
                FlinkStatistic.builder()
                        .statistic(table.getStatistic())
                        .tableStats(newTableStat)
                        .build();
        TableSourceTable newTable = table.copy(newStatistic);
        return new LogicalTableScan(
                scan.getCluster(), scan.getTraitSet(), scan.getHints(), newTable);
    }
    
    private TableStats recomputeStatistics(
            TableSourceTable table,
            PartitionPushDownSpec partitionPushDownSpec,
            FilterPushDownSpec filterPushDownSpec,
            boolean reportStatEnabled) {
        DynamicTableSource tableSource = table.tableSource();
        if (filterPushDownSpec != null) {
            // filter push down
            // the catalog do not support get statistics with filters, 
            // so only call reportStatistics method if needed
            return reportStatEnabled
                    ? ((SupportsStatisticReport) tableSource).reportStatistics()
                    : null;
        } else {
            if (partitionPushDownSpec != null) {
                // collect the partitions statistics from catalog
                TableStats newTableStat = getPartitionsTableStats(table, partitionPushDownSpec);
                if (reportStatEnabled
                        && (newTableStat == null || newTableStat == TableStats.UNKNOWN)) {
                    return ((SupportsStatisticReport) tableSource).reportStatistics();
                } else {
                    return newTableStat;
                }
            } else {
                if (reportStatEnabled
                        && (table.getStatistic().getTableStats() == TableStats.UNKNOWN)) {
                    return ((SupportsStatisticReport) tableSource).reportStatistics();
                } else {
                    return null;
                }
            }
        }
    }
    
    private TableStats getPartitionsTableStats(
            TableSourceTable table, PartitionPushDownSpec partitionPushDownSpec) {
        if (partitionPushDownSpec == null) {
            return null;
        }
        // collect and merge the partitions statistics from catalog
    }
}



  1. Which connectors and formats will be supported by default?

FileSystem collector, Hive connector, Csv format, Parquet format, Orc format will be supported by default.

...