Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

Vote threadhttps://

...

...

list.html?dev@flink.apache.org
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-27982

Release1.16


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Statistics are one of the most important inputs to the optimizer. Accurate and complete statistics allows the optimizer to be more powerful. Currently, the statistics of Flink SQL come from Catalog only, while many Connectors have the ability to provide statistics, e.g. FileSystem. In production, we find many tables in Catalog do not have any statistics. As a result, the optimizer can't generate better execution plans, especially for Batch jobs.

...

The main purpose of this FLIP is to disucss discuss the second approacheapproach. Compared to the first approacheapproach, the second one is to get statistics in real time, no need to run analyze job for each table. This could help improve the user experience. The disadvantage is, in most cases, the statistics reported by connector is not as complete as the results of analyze job. We will also introduce the "ANALYZE TABLE" syntax in other FLIP.

Public Interfaces


SupportsStatisticReport is an interface that allows the Connector to report statistics to the planner. The statistics reported by Connector will be used when the statistics from Catalog is unknown.

...

public static final ConfigOption<Boolean> TABLE_OPTIMIZER_SOURCE_COLLECTREPORT_STATISTICS_ENABLED =
    key("table.optimizer.source.collectreport-statistics-enabled")
        .booleanType()
        .defaultValue(true)
        .withDescription(
            "When it is true, the optimizer will collect and use the statistics from source connector"
            + " if the source extends from SupportsStatisticReport and the collected statistics is not UNKNOWN."
            + "Default value is true.");

...

Currently, the statistical dimensions used by the optimizer include row count, ndv(number fo distinct value), null count, max length, min length, max value and min value.[4] The file count, file size (which can be easily get from file system) is not used in the planner now, we can improve this later.

/**
 * Extension of {@linkinput DecodingFormat}format which is able to report estimated statistics for FileSystem
 * connector.
 */
@PublicEvolving
public interface FileBasedStatisticsReportableDecodingFormat<I> extends DecodingFormat<I>FileBasedStatisticsReportableInputFormat {

    /**
     * Returns the estimated statistics of this {@linkinput DecodingFormat}format.
     *
     * @param files The files to be estimated.
     * @param producedDataType the final output type of the format.
     */
    TableStats reportStatistics(List<Path> files, DataType producedDataType);
}

...

public static final ConfigOption<FileStatisticsType> SOURCE_STATISTICS_TYPE =
    key("source.report-statistics-type")
        .enumType(FileStatisticsType.class)
        .defaultValue(FileStatisticsType.ALL)
        .withDescription("The file statistics type which the source could provide. "
            + "The statistics collecting is a heavy operation in some cases,"
            + "this config allows users to choose the statistics type according to different situations.");

public enum FileStatisticsType implements DescribedEnum {
    NONE("NONE", text("Do not collect any file statistics.")),
    ALL("ALL", text("Collect all file statistics that the format can provide."));

    private final String value;
    private final InlineElement description;

    FileStatisticsType(String value, InlineElement description) {
        this.value = value;
        this.description = description;
    }

    @Override
    public String toString() {
        return value;
    }

    @Override
    public InlineElement getDescription() {
        return description;
    }
}

Proposed Changes

  1. How the planner use the statistics reported by connector?

...

    private LogicalTableScan recomputeStatistics(LogicalTableScan scan) {
        final RelOptTable scanTable = scan.getTable();
        if (!(scanTable instanceof TableSourceTable)) {
            return scan;
        }
         boolean reportStatEnabled =
                ShortcutUtils.unwrapContext(scan)
                                .getTableConfig()
                                .get(TABLE_OPTIMIZER_SOURCE_COLLECTREPORT_STATISTICS_ENABLED)
                        && table.tableSource() instanceof SupportsStatisticReport;

...

More collectors and formats can be supported as needed in the future.

Compatibility, Deprecation, and Migration Plan

This is new feature, no compatibility, deprecation, and migration plan.

Test Plan

  1. UT tests will be added for each format to verify the estimation statistics logic.
  2. Plan tests will be added to verify the logic of how planner uses the connector statistics.

Rejected Alternatives

None


POC: https://github.com/godfreyhe/flink/tree/FLIP-231

...