Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: "Under Discussion"

Discussion thread: here (<- link to https://mail-archiveslists.apache.org/mod_mbox/flink-dev/thread/88kxk7lh8bq2s2c2qrf06f3pnf9fkxj2)

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

...

/** Enables to report the estimated statistics provided by the {@link DynamicTableSource}. */
@PublicEvolving
public interface SupportStatisticReport {

    /**
     * Returns the estimated statistics of this {@link DynamicTableSource}, else {@link
     * CatalogStatistics#UNKNOWN} if some situations are not supported or cannot be handled.
     */
    CatalogStatistics reportStatistics();
}

We introduce a new config option as following to whether to call the reportStatistics method or not. Because it's a heavy operation to collect the statistics for some source in same cases.

public static final ConfigOption<Boolean> TABLE_OPTIMIZER_SOURCE_CONNECT_STATISTICS_ENABLED =
    key("table.optimizer.source.connect-statistics-enabled")
        .booleanType()
        .defaultValue(true)
        .withDescription(
            "When it is true, the optimizer will connect and use the statistics from source connector"
            + " if the source extends from SupportStatisticReport and the connected statistics is not UNKNOWN."
            + "Default value is true.");



The FileSystem connector is a commonly used connector, especially for batch jobs. FileSystem supports multple kinds of format, such as: csv, parquet, orc, etc. [1] Different formats have different ways of getting statistics. For parquet[2] and orc[3], they both have metadata information stored in the file footer, which including row count, max/min, null count, etc. For csv, we can get file size and estimated row count (file_size/simpled_lines_length).

...

/**
 * Extension of {@link DecodingFormat} which is able to report estimated statistics for FileSystem
 * connector.
 */
@PublicEvolving
public interface FileBasedStatisticsReportableDecodingFormat<I> extends DecodingFormat<I> {

    /**
     * Returns the estimated statistics of this {@link DecodingFormat}.
     *
     * @param files The files to be estimated.
     * @param producedDataType the final output type of the format.
     */
    CatalogStatistics reportStatistics(List<Path> files, DataType producedDataType);
}

It's a heavy operation if there are thousands of file to list or to read footer, so we also introduce a config option as following to allow the users to choose which kind of statistics is needed.

public static final ConfigOption<FileStatisticsType> SOURCE_STATISTICS_TYPE =
    key("source.statistics-type")
        .enumType(FileStatisticsType.class)
        .defaultValue(FileStatisticsType.ALL)
        .withDescription("The file statistics type which the source could provide. "
            + "The statistics collecting is a heavy operation in some cases,"
            + "this config allows users to choose the statistics type according to different situations.");

public enum FileStatisticsType implements DescribedEnum {
    NONE("NONE", text("Do not collect any file statistics.")),
    ALL("ALL", text("Collect all file statistics that the format can provide."));

    private final String value;
    private final InlineElement description;

    FileStatisticsType(String value, InlineElement description) {
        this.value = value;
        this.description = description;
    }

    @Override
    public String toString() {
        return value;
    }

    @Override
    public InlineElement getDescription() {
        return description;
    }
}

Proposed Changes

  1. How the planner use the statistics reported by connector?

    The

...

  1. statistics for a table needs to be re-computed when:

      1. the statistics from catalog is unknown
      2. the partitions are pruned
      3. the filter predicates are pushed down

    In order to avoid multiple recalculations for each of the above operations, we introduced a new optimzation program after the predicate pushdown program to collect the statistics one-time.

    The pseudocode is as follows:

    public class FlinkCollectStatisticsProgram implements FlinkOptimizeProgram<BatchOptimizeContext> {
    
        @Override
        public RelNode 

...

  1. optimize(RelNode root, BatchOptimizeContext context) {
            // create

...

  1.  a visitor to find all LogicalTableScan nodes
            // call collectStatistics method for each LogicalTableScan
        }
    
        private LogicalTableScan collectStatistics(LogicalTableScan scan) {
            final RelOptTable scanTable = scan.getTable();
            if (!(scanTable instanceof TableSourceTable)) {
                return scan;
            }
            boolean collectStatEnabled =
                    ShortcutUtils.unwrapContext(scan)
                            .getTableConfig()
                            .get(TABLE_OPTIMIZER_SOURCE_CONNECT_STATISTICS_ENABLED);
    
            TableSourceTable table = (TableSourceTable) scanTable;
            DynamicTableSource tableSource = table.tableSource();
            

...

  1. SourceAbilitySpec[] specs = table.abilitySpecs();
            PartitionPushDownSpec partitionPushDownSpec = // find the PartitionPushDownSpec
            

...

  1. FilterPushDownSpec 

...

  1. filterPushDownSpec = // find the FilterPushDownSpec
            TableStats newTableStat = 

...

  1. null;
            
            if (

...

  1. partitionPushDownSpec != null && 

...

  1. filterPushDownSpec 

...

  1. == null) {
                // do partition pruning while no filter push down 
                if (table.contextResolvedTable().isPermanent()) {
                

...

  1.     // collect the statistics from catalog
                }
    
            

...

  1.     if (collectStatEnabled
                        && 

...

  1. (newTableStat == null || newTableStat == TableStats.UNKNOWN)
                        && tableSource instanceof 

...

  1. SupportStatisticReport) {
                    CatalogStatistics statistics =
                       

...

  1.      ((SupportStatisticReport) tableSource).reportStatistics();
                    newTableStat = CatalogTableStatisticsConverter.

...

  1. convertToTableStats(statistics);
                }
          

...

When partitions are pruned by PushPartitionIntoTableSourceScanRule, the statistics should also be updated.

The pseudocode is as follows:

...

  1.  
            } else if (filterPushDownSpec != null) {
                // only filter 

...

  1. push 

...

  1. down
                // the catalog do not support get statistics 

...

  1. with 

...

  1. filters, 
             

...

  1.    // so only call reportStatistics method if needed
         

...

  1.  

...

  1.  

...

  1.  

...

  1.     if (

...

  1. collectStatEnabled && tableSource instanceof SupportStatisticReport) {
                 

...

  1.   

...

  1.  

...

  1. CatalogStatistics statistics 

...

  1. =
        

...

  1.  

...

  1.  

...

  1.  

...

  1.  

...

  1.         

...

  1.  

...

  1.  

...

  1.  

...

  1.  

...

  1.  

...

  1.    

...

  1. ((SupportStatisticReport) tableSource).reportStatistics();
        

...

  1.     

...

  1.  

...

  1.  

...

  1.  

...

  1.  

...

  1.  

...

  1.  

...

  1.  

...

  1.  

...

  1. newTableStat 

...

  1. = CatalogTableStatisticsConverter.convertToTableStats(statistics);
         

...

  1.  

...

  1.  

...

  1.  

...

  1.  

...

  1.  

...

  1.  

...

  1.  

...

  1. }
            } else if (

...

  1. collectStatEnabled
                    && (table.getStatistic().getTableStats() == TableStats.UNKNOWN)
                    && 

...

  1. tableSource instanceof SupportStatisticReport) {
                // no partition pruning and no filter push down
                // call reportStatistics method if needed
                CatalogStatistics statistics =

...

  1. 
                        ((SupportStatisticReport) 

...

  1. tableSource).reportStatistics();
                newTableStat = CatalogTableStatisticsConverter.convertToTableStats(statistics);
            }
            FlinkStatistic newStatistic =
                   

...

  1.  FlinkStatistic.builder()
                            .statistic(

...

  1. table.getStatistic())
                            .tableStats(newTableStat)
                            .build();
            return new LogicalTableScan(
                    scan.getCluster(), scan.getTraitSet(), scan.getHints(), table.copy(newStatistic));
        }
    }


  2. Which connectors and formats will be supported by default?

...