Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

At present, the partition we want to support is similar to that of hive and only supports single value list partition.

Create Table

With hive dialect (We can set dialect in TableConfig):

CREATE TABLE country_page_view(

...

The table will be partitioned by two fields.

Why like hive:

  • We don’t need support rich partitioning criteria. Because hive bucket cover these concepts.
  • Partition column is the pseudocolumn to unify batch and streaming.
  • Such a grammar can isolate the partition columns from the regular columns and avoid confusion of user concepts. (Either way, the two may intersect to make the specific data strange.

...

How to do partition pruning depends entirely on tableTableSource's own implementation: 

  • The table is catalog table: planner will source can use catalog to do partition pruning. For example, hive table source can touch its catalog from creation of HiveTableFactory.
  • The table is temporary table: planner will list all partitions Without catalog, the table source will list sub directories to do the filter by name.

How to do partition pruning depends on table: 

  • The table is catalog table: planner will use catalog to do partition pruning.
  • The table is temporary table: planner will use the all_partitions returned by the temporary table and do the filter by name.

Add Catalog ApiAdd Catalog Api:

List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters)

...

  • Writing buffers is small: Like Csv/Text, In this case, when dynamic partitioning, we can write multiple files simultaneously in a task of sink.
  • Writing buffers is big: Like Orc/Parquet, In this case, when dynamic partitioning, we can not write multiple files simultaneously in a task of sink. Otherwise, too much memory will lead to OOM.

public interface PartitionableTableSink {

  List<String> getPartitionFieldNames();

  // set the static partition into the TableSink.

  void setStaticPartition(Map<String, String> partitions);

  // If returns true, sink can trust all records will definitely be grouped by partition fields before consumed by the sink, sink can use “grouped multi-partition writer”. If returns false, there are no need to do partition grouping.

  // If never invoke this method, that mean the execution mode(streaming mode) don’t support grouping, the sink should use its “ungrouped multi-partition writer” when there are dynamic partitions.

  boolean enableDynamicPartitionGrouping();

}

Sink implementation should provide three writers:

  • single-partition writer: writes data to a single partition (non-dynamic-partition writes).
  • grouped multi-partition writer: inputs are grouped by dynamic partitions, So there's only one partition at the same time.
  • ungrouped multi-partition writer: writing multiple partitions at the same time consumes more memory.

Streaming partition write

Scenes

There are many scenarios where data can be written to FileSink through streaming job. At the same time, these data can be analyzed and calculated by batch job.

  • static partition writing to sink.
  • dynamic partition writing
    • partitioned by window time, maybe event time or processing time. Without trigger, the partition column is monotonically incremental.
    • partitioned by regular columns.

Exactly-once semantics

Like StreamingFileSink, table sink should integrated with the checkpointing mechanism to provide exactly once semantics.

The files can be in one of three states: in-progress, pending or finished. The file that is currently being written to is in-progress. Once a file is closed for writing it becomes pending. When a checkpoint is successful the currently pending files will be moved to finished.

Sink implementation should provide three writers:

  • single-partition writer: writes data to a single partition (non-dynamic-partition writes).
  • grouped multi-partition writer: inputs are grouped by dynamic partitions, So there's only one partition at the same time.
  • ungrouped multi-partition writer: writing multiple partitions at the same time consumes more memory.

Streaming partition write

Scenes

There are many scenarios where data can be written to FileSink through streaming job. At the same time, these data can be analyzed and calculated by batch job.

  • static partition writing to sink.
  • dynamic partition writing
    • partitioned by window time, maybe event time or processing time. Without trigger, the partition column is monotonically incremental.
    • partitioned by regular columns.

Exactly-once semantics

Like StreamingFileSink, table sink should integrated with the checkpointing mechanism to provide exactly once semantics.

The files can be in one of three states: in-progress, pending or finished. The file that is currently being written to is in-progress. Once a file is closed for writing it becomes pending. When a checkpoint is successful the currently pending files will be moved to finished.

StreamingFileSink does many StreamingFileSink does many great works:

  • Decouple checkpoint from file size. It provides an abstraction of RollingPolicy to determine file size. On snapshot, it will not only store the pending files, but also store in-progress files. In case of a failure, it will restore the pending files, and restore in-progress files too. (In-progress files will be truncated to discard the content that does not belong to that checkpoint. This is achieved by using RecoverableWriter.)

...

Flink FileSystem connector

The DDL can like this (With hive dialect):

CREATE TABLE USER_T (

  a INT,

...

The only difference from the previous FileSystem is that the partition-support attribute is required. We can use this identifier to represent the new connector support partition without changing the previous connector.Other attributes can be completely consistent.

 'partition-support' = 'true' can be removed after we full support csv format.

And provide table And provide table factories:

  • Provide FileSystemTableFactory: CsvFileSystemTableFactory Csv format and HiveFileSystemTableFactory Hive format will extend use it.
  • Provide FileSystemTableSink and FileSystemTableSource
  • Provide BatchFileSystemSink and StreamingFileSystemSink

...

CREATE TABLE country_page_view(

  user STRING,

  cnt INT,

  date STRING,

  country STRING

PARTITIONED BY (date STRING, country STRING);

The table will be partitioned by two fields.

...

static partition writing:

INSERT { INTO | OVERWRITE } TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) select_statement1 FROM from_statement;

dynamic partition writing:

INSERT { INTO | OVERWRITE } TABLE tablename1 select_statement1 FROM from_statement;

...

ALTER TABLE table_name DROP PARTITION partition_spec[, PARTITION partition_spec, ...]

Recover Partitions (MSCK REPAIR TABLE)

partition_spec ::= (partition_column = partition_col_value, partition_column = partition_col_value, ...)

Recover Partitions (MSCK REPAIR TABLE)

Flink stores a list Flink stores a list of partitions for each table in its catalog. If, however, new partitions are directly added to HDFS (say by using hadoop fs -put command) or removed from HDFS, the catalog will not be aware of these changes to partition information unless the user runs ALTER TABLE table_name ADD/DROP PARTITION commands on each of the newly added or removed partitions, respectively.[3]

...

SHOW TABLE EXTENDED [IN|FROM database_name] LIKE 'identifier_with_wildcards' [PARTITION(partition_spec)];

Describe

DESCRIBE [EXTENDED | FORMATTED] [db_name.]table_name [PARTITION partition_spec] [col_name];

...

];

Describe

DESCRIBE [EXTENDED | FORMATTED] [db_name.]table_name [PARTITION partition_spec] [col_name];

Catalog interface

public interface Catalog {

  …..

  void renamePartition(ObjectPath tablePath, CatalogPartitionSpec spec, CatalogPartitionSpec newSpec) throws PartitionNotExistException, PartitionAlreadyExistsException, CatalogException;

  void syncPartitions(ObjectPath tablePath) throws TableNotExistException, CatalogException;

 List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters) throws TableNotExistException, TableNotPartitionedException, CatalogException;

}

Futher discussion

Create DDL

Should we support partition grammar like Spark SQL? (Subsequent votes will be taken to determine.)

TableSink Interface

public interface PartitionableTableSink {

...

  void setStaticPartition(Map<String, String> partitions);

  // get dynamic partition column names.

  List<String> getDynamicPartitionFieldNames();

  // If returns true, sink can trust all records will definitely be grouped by partition fields before consumed by the sink, sink can use “grouped multi-partition writer”. If returns false, there are no need to do partition grouping.

...

  boolean enableDynamicPartitionGrouping();

}

Catalog interface

public interface Catalog {

  …..

  void renamePartition(ObjectPath tablePath, CatalogPartitionSpec spec, CatalogPartitionSpec newSpec) throws PartitionNotExistException, PartitionAlreadyExistsException, CatalogException;

  void syncPartitions(ObjectPath tablePath) throws TableNotExistException, CatalogException;

 List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters) throws TableNotExistException, TableNotPartitionedException, CatalogException;

}

Road map

  1. Modify DDL support.
  2. Rework partition pruning
  3. Rework dynamic partitioning
  4. Introduce FileSystemTableFactoryIntroduce FileSystemTableFactory
    • Introduce BatchFileSystemSink
    • Introduce StreamingFileSystemSink
    • Introduce FileSystemTableFactory and FileSystemTableSource and FileSystemTableSource and FileSystemTableSink
    • Introduce new CSV for FileSystemTableFactory
    • Integrate Hive to FileSystemTableFactory

...