Status

Discussion threadhttps://lists.apache.org/thread/rsrykky79rzp1nyrkff0tl3xc7hsv31q
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release1.16

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As FLIP-231[1] mentioned, statistics are one of the most important inputs to the optimizer. Accurate and complete statistics allows the optimizer to be more powerful. "ANALYZE TABLE" syntax is a very common but effective approach to gather statistics, which is already introduced by many compute engines and databases [2][3][4][5][6].

The main purpose of this FLIP is to discuss introducing "ANALYZE TABLE" syntax for Flink sql.

Public Interfaces

Sql Syntax

ANALYZE TABLE table_name
[
  PARTITION (partcol1[=val1] [, partcol2[=val2], ...])
]
COMPUTE STATISTICS 
[
    FOR COLUMNS col1 [, col2, ...]
  |
    FOR ALL COLUMNS  
]

Only existing table is supported, and an exception will thrown if the table is a view.

PARTITION (partcol1=val1 [, partcol2=val2, ...]) is optional for the partition table. If no partition is specified, the statistics will be gathered for all partitions. If a certain partition is specified, the statistics will be gathered only for the specific partition. If the table is non-partition table, while a partition is specified, an exception will be thrown.

FOR COLUMNS col1 [, col2, ...] or FOR ALL COLUMNS are also optional. If no column is specified, only table level statistics will be gathered. If any columns are specified, the column level statistics will be gathered. The column level statistics include: the number of distinct values (ndv), the number of nulls(nullCount), the average length of column values(avgLen), the max length of column values(maxLen), the min value of column values(minValue), the max values of column values(maxValue), and the value count for boolean type. The following sheet lists the supported types and its corresponding column level statistics:

Types

ndv

nullCount

avgLen

maxLen

maxValue

minValue

valueCount

BOOLEAN

×

×

×

×

×

TINYINT

×

×

×

SMALLINT

×

×

×

INTEGER

×

×

×

FLOAT

×

×

×

DATE

×

×

×

TIME_WITHOUT_TIME_ZONE

×

×

×

BIGINT

×

×

×

DOUBLE

×

×

×

DECIMAL

×

×

×

TIMESTAMP_WITH_LOCAL_TIME_ZONE

×

×

×

TIMESTAMP_WITHOUT_TIME_ZONE

×

×

×

CHAR

×

×

×

VARCHAR

×

×

×

other types

×

×

×

×

×

×

NOTES: For the fix length types (e.g. BOOLEAN, INTEGER, DOUBLE), we need not to gather the `avgLen` and `maxLen` from the original records.

Examples

Partition table

Suppose table MyTable has the schema: a INT, b BIGINT, c VARCHAR and has 4 partitions with the following specs

Partition1: (ds='2022-06-01', hr=1)

Partition2: (ds='2022-06-01', hr=2)

Partition3: (ds='2022-06-02', hr=1)

Partition4: (ds='2022-06-02', hr=2)


-- collect row count for partition1
ANALYZE TABLE MyTable PARTITION (ds='2022-06-01', hr=1) COMPUTE STATISTICS;

-- collect row count for partition1 and partition2
ANALYZE TABLE MyTable PARTITION (ds='2022-06-01') COMPUTE STATISTICS;

-- collect row count for all partitions
ANALYZE TABLE MyTable COMPUTE STATISTICS;

-- collect row count and all columns statistics for partition1
ANALYZE TABLE MyTable PARTITION (ds='2022-06-01', hr=1) COMPUTE STATISTICS FOR ALL COLUMNS;

-- collect row count and all columns statistics for partition1 and partition2
ANALYZE TABLE MyTable PARTITION (ds='2022-06-01') COMPUTE STATISTICS FOR ALL COLUMNS;

-- collect row count and all columns statistics for all partitions
ANALYZE TABLE MyTable COMPUTE STATISTICS FOR ALL COLUMNS;

-- collect row count and column: a statistics for partition1
ANALYZE TABLE MyTable PARTITION (ds='2022-06-01', hr=1) COMPUTE STATISTICS FOR COLUMNS a;

-- collect row count and column: a, b statistics for partition1 and partition2
ANALYZE TABLE MyTable PARTITION (ds='2022-06-01') COMPUTE STATISTICS FOR COLUMNS a, b;

-- collect row count and column: a, b, c statistics for all partitions
ANALYZE TABLE MyTable COMPUTE STATISTICS FOR COLUMNS a, b, c;

Non-partition table

Suppose table MyTable has the schema: a INT, b BIGINT, c VARCHAR

-- collect row count
ANALYZE TABLE MyTable COMPUTE STATISTICS;

-- collect row count and all columns statistics
ANALYZE TABLE MyTable COMPUTE STATISTICS FOR ALL COLUMNS;

-- collect row count and column: a statistics
ANALYZE TABLE MyTable COMPUTE STATISTICS FOR COLUMNS a;

Proposed Changes

Currently, ANALYZE TABLE does not support streaming mode, because the sources may be unbounded and we can not get the accurate results. We can support it in the future for the case: all sources are bounded. So the following discussing is based on batch mode.


When to execute the `ANALYZE TABLE` command

The `ANALYZE TABLE` is not triggered automatically, but is triggered manually as needed.

How to collect the statistics

The ANALYZE TABLE statement will be converted to a SELECT statement, each statistics item will be converted to aggregate function, e.g. MAX(x) for max value of column values. To collect the number of distinct values for a specified column x, we can use COUNT(DISTINCT x) function. But it's a very heavy operation, and there may be hundreds of columns. Statistics does not require accurate value, approximate values are also acceptable. We will introduce APPROX_COUNT_DISTINCT function which is a lighter approach to compute the number of distinct values. Calcite has introduce the definition of APPROX_COUNT_DISTINCT, we only need to introduce the implementation.

The following examples shows the SELECT statements converted from ANALYZE TABLE statements:

CRATE TABLE Table1 (
   a BOOLEAN,
   b INT,
   c VARCHAR
) WITH (...);

ANALYZE TABLE Table1 COMPUTE STATISTICS FOR ALL COLUMNS;

-- the above ANALYZE TABLE statement will be converted to:
SELECT 
  COUNT(1) AS rowCount, 
  (COUNT(1) - COUNT(`a`)) AS a_nullCount, 
  COUNT(`a`) FILTER (WHERE `a` IS TRUE) AS a_trueCount, 
  COUNT(`a`) FILTER (WHERE `a` IS FALSE) AS a_falseCount, 
  (COUNT(1) - COUNT(`b`)) AS b_nullCount, 
  APPROX_COUNT_DISTINCT(`b`) AS b_ndv, 
  MAX(`b`) AS b_max, 
  MIN(`b`) AS b_min, 
  (COUNT(1) - COUNT(`c`)) AS c_nullCount, 
  APPROX_COUNT_DISTINCT(`c`) AS c_ndv, 
  MAX(`c`) AS c_max, MIN(`c`) AS c_min, 
  (COUNT(1) - COUNT(`i`)) AS i_nullCount, 
  APPROX_COUNT_DISTINCT(`i`) AS i_ndv, 
  AVG(CAST(CHAR_LENGTH(`i`) AS DOUBLE)) AS i_avgLen,
  MAX(CAST(CHAR_LENGTH(`i`) AS BIGINT)) AS i_maxLen
FROM Table1;

CRATE TABLE Table2 ( a BOOLEAN, b INT, c VARCHAR,
p VARCHAR ) PARTITIONED BY(p) WITH (...); ANALYZE TABLE Table2 PARTITION(p=1) COMPUTE STATISTICS; -- the above ANALYZE TABLE statement will be converted to: SELECT COUNT(1) AS rowCount FROM Table2 where p=1;

The SELECT job will be submitted via TableEnvironment#executeSql method, and the execution result will be collected via TableResult.collect method. Normally, the select job do not take long to execute. So, currently, the execution result collecting and statistics updating are all in client.

How to update the statistics

As we define the unique field name for each statistics item (see the above SELECT statement), we can get the specified value from the result row, and convert them to CatalogTableStatistics and CatalogColumnStatistics. Finally, we update the statistics through catalog api. The pseudocode looks like:

CatalogTable table = catalogManager.getTable(...);
Catalog catalog = catalogManager.getCatalog(...);
ObjectPath objectPath = ...;
     
if (table.isPartitioned()) {
    // partition table
    List<CatalogPartitionSpec> targetPartitions = ...;
    for (CatalogPartitionSpec p : targetPartitions) {
        // generate the select statement based on AnalyzeTableOperation
        String statSql = ...;
        // submit the select job and wait the execution result
        TableResult tableResult = executeSql(statSql);
        List<Row> result = CollectionUtil.iteratorToList(tableResult.collect());
        Row row = result.get(0);
        // convert the table statistics from the row
        CatalogTableStatistics tableStat = convertToTableStatistics(row);
        catalog.alterPartitionStatistics(objectPath, p, tableStat, false);
        // convert the column statistics from the row
        CatalogColumnStatistics columnStat = convertToColumnStatistics(row);
        catalog.alterPartitionColumnStatistics(objectPath, p, columnStat, false);
    }
} else {
    // non partition table
    // generate the select statement based on AnalyzeTableOperation
    String statSql = ...;
    // submit the select job and wait the execution result
    TableResult tableResult = executeSql(statSql);
    List<Row> result = CollectionUtil.iteratorToList(executeSql(statSql).collect());
    Row row = result.get(0);
    // convert the table statistics from the row
    CatalogTableStatistics tableStat = convertToTableStatistics(row);
    catalog.alterTableStatistics(objectPath, tableStat, false);
    // convert the column statistics from the row
    CatalogColumnStatistics columnStat = convertToColumnStatistics(row);
    catalog.alterTableColumnStatistics(objectPath, columnStat, false);
}

Compatibility, Deprecation, and Migration Plan

This is new feature, no compatibility, deprecation, and migration plan.

Test Plan

  1. UT tests will be added to verify the sql parse result
  2. IT test will be added to verify the the statistics result

Rejected Alternatives

None


POC: https://github.com/godfreyhe/flink/tree/FLIP-240


[1] https://cwiki.apache.org/confluence/pages/resumedraft.action?draftId=211883860&draftShareId=eda17eaa-43f9-4dc1-9a7d-3a9b5a4bae00&

[2] https://cwiki.apache.org/confluence/display/hive/statsdev

[3] https://spark.apache.org/docs/3.0.0-preview/sql-ref-syntax-aux-analyze-table.html

[4] https://drill.apache.org/docs/analyze-table-compute-statistics/

[5] https://docs.oracle.com/cd/B19306_01/server.102/b14200/statements_4005.htm

[6] https://dev.mysql.com/doc/refman/5.6/en/analyze-table.html