You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »


Status

Current state: "Under Discussion"

Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Statistics are one of the most important inputs to the optimizer. Accurate and complete statistics allows the optimizer to be more powerful. Currently, the statistics of Flink SQL come from Catalog only, while many Connectors have the ability to provide statistics, e.g. FileSystem. In production, we find many tables in Catalog do not have any statistics. As a result, the optimizer can't generate better execution plans, especially for Batch jobs.

There are two approaches to enhance statistics for the planner, one is to introduce the "ANALYZE TABLE" syntax which will write the analyzed result to the catalog, another is to introduce a new connector interface which allows the connector itself to report statistics directly to the planner. The second one is a supplement to the catalog statistics.

The main purpose of this FLIP is to discuss the second approach. Compared to the first approach, the second one is to get statistics in real time, no need to run an analysis job for each table. This could help improve the user experience. The disadvantage is, in most cases, the statistics reported by connector is not as complete as the results of analyzed job. We will also introduce the "ANALYZE TABLE" syntax in other FLIP.

Public Interfaces

Currently, table statistics and column statistics are described via two classes. We introduce CatalogStatistics to combine table statistics and column statistics, which describes all statistics for a table or partition.

/**
 * All statistics for a table or partition, including {@link CatalogTableStatistics} and {@link
 * CatalogColumnStatistics}.
 */
@PublicEvolving
public class CatalogStatistics {
    public static final CatalogStatistics UNKNOWN =
            new CatalogStatistics(CatalogTableStatistics.UNKNOWN, CatalogColumnStatistics.UNKNOWN);

    private final CatalogTableStatistics tableStatistics;
    private final CatalogColumnStatistics columnStatistics;

    public CatalogStatistics(
            CatalogTableStatistics tableStatistics, CatalogColumnStatistics columnStatistics) {
        this.tableStatistics = tableStatistics;
        this.columnStatistics = columnStatistics;
    }

    public CatalogTableStatistics getTableStatistics() {
        return tableStatistics;
    }

    public CatalogColumnStatistics getColumnStatistics() {
        return columnStatistics;
    }
}


SupportStatisticReport is an interface that allows the Connector to report statistics to the planner. The statistics reported by Connector have a high priority and could override the statistics from Catalog.

/** Enables to report the estimated statistics provided by the {@link DynamicTableSource}. */
@PublicEvolving
public interface SupportStatisticReport {

    /**
     * Returns the estimated statistics of this {@link DynamicTableSource}, else {@link
     * CatalogStatistics#UNKNOWN} if some situations are not supported or cannot be handled.
     */
    CatalogStatistics reportStatistics();
}


The FileSystem connector is a commonly used connector, especially for batch jobs. FileSystem supports multple kinds of format, such as: csv, parquet, orc, etc. [1] Different formats have different ways of getting statistics. For parquet[2] and orc[3], they both have metadata information stored in the file footer, which including row count, max/min, null count, etc. For csv, we can get file size and estimated row count (file_size/simpled_lines_length).

Currently, the statistical dimensions used by the optimizer include row count, ndv(number fo disitinct value), null count, max length, min length, max value and min value.[4] The file count, file size (which can be easily get from file system) is not used in the planner now, we can improve this later.

We introduce FileBasedStatisticsReportableDecodingFormat interface to get the estimated statistics for the format in FileSystem connector.

/**
 * Extension of {@link DecodingFormat} which is able to report estimated statistics for FileSystem
 * connector.
 */
@PublicEvolving
public interface FileBasedStatisticsReportableDecodingFormat<I> extends DecodingFormat<I> {

    /**
     * Returns the estimated statistics of this {@link DecodingFormat}.
     *
     * @param files The files to be estimated.
     * @param producedDataType the final output type of the format.
     */
    CatalogStatistics reportStatistics(List<Path> files, DataType producedDataType);
}

Proposed Changes

  1. How the planner use the statistics reported by connector?

The planner will call SupportStatisticReport#reportStatistics() method if the catalog statistics is unknown and the table source extends from SupportStatisticReport in CatalogSourceTable#toRel method.

The pseudocode is as follows:

public RelNode toRel(ToRelContext toRelContext) {
    // create table source
    DynamicTableSource tableSource = ...;
    FlinkStatistic flinkStatistic = schemaTable.getStatistic();
    
    // get statistics from connector if the catalog statistics is unknown
    // and the table source extends from SupportStatisticReport
    if (flinkStatistic.getTableStats() == TableStats.UNKNOWN
        && tableSource instanceof SupportStatisticReport) {
        CatalogStatistics statistics = 
            ((SupportStatisticReport) tableSource).reportStatistics();
        if (statistics != null && statistics != CatalogStatistics.UNKNOWN) {
            TableStats tableStats = 
                CatalogTableStatisticsConverter.convertToTableStats(statistics);
            flinkStatistic = FlinkStatistic.builder()
                .statistic(flinkStatistic)
                .tableStats(tableStats)
                .build();
        }
     }
}


When partitions are pruned by PushPartitionIntoTableSourceScanRule, the statistics should also be updated.

The pseudocode is as follows:

private FlinkStatistic recomputeStatistics(
        DynamicTableSource newDynamicTableSource,
        TableSourceTable tableSourceTable,
        List<Map<String, String>> remainingPartitions) {

    TableStats newTableStat = null;
    if (tableSourceTable.contextResolvedTable().isPermanent()) {
        // get new statistics for the reamining partition from catalog
        // the logic is not changed
    }
    
    // get statistics from connector if the catalog statistics is unknown
    // and the table source extends from SupportStatisticReport
    if ((newTableStat == null || newTableStat == TableStats.UNKNOWN)
        && newDynamicTableSource instanceof SupportStatisticReport) {
        CatalogStatistics statistics = 
            ((SupportStatisticReport) newDynamicTableSource).reportStatistics();
        newTableStat = CatalogTableStatisticsConverter.convertToTableStats(statistics);
    }

     return FlinkStatistic.builder()
         .statistic(tableSourceTable.getStatistic())
         .tableStats(newTableStat)
         .build();
}


  1. Which connectors and formats will be supported by default?

FileSystem collector, Csv format, Parquet format, Orc format will be supported by default.

More collectors and formats can be supported as needed in the future.

Compatibility, Deprecation, and Migration Plan

This is new feature, no compatibility, deprecation, and migration plan.

Test Plan

  1. UT tests will be added for each format to verify the estimation statistics logic.
  2. Plan tests will be added to verify the logic of how planner uses the connector statistics.

Rejected Alternatives

None


POC: https://github.com/godfreyhe/flink/tree/FLIP-231


[1] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/overview

[2] https://parquet.apache.org/docs/file-format/metadata

[3] https://orc.apache.org/specification/ORCv1

[4] https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/TableStats.java


  • No labels