Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14249

Release1.10

Current state: Under Discussion

JIRA:

...


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

At present, the partition we want to support is similar to that of hive and only supports single value list partition.

Create Table

CREATE TABLE country_page_view(

  user STRING,

  cnt INT) 

PARTITIONED BY (date STRING, country STRING);

The table will be partitioned by two fields.

Why like hive:

  • We don’t need support rich partitioning criteria. Because hive bucket cover these concepts.
  • Partition column is the pseudocolumn to unify batch and streaming.
  • Such a grammar can isolate the partition columns from the regular columns and avoid confusion of user concepts. (Either way, the two may intersect to make the specific data strange.

static partitioning insert

Users can specify the value of partition while inserting the data:

INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) select_statement1 FROM from_statement;

INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 FROM from_statement;

  • PARTITION clause should contain all partition columns of this table.
  • The fields returned in this select statement should not contain any of the partition columns.
  • INSERT OVERWRITE will overwrite any existing data in the table or partition
  • INSERT INTO will append to the table or partition, keep the existing data intact
  • Both INSERT INTO and INSERT OVERWRITE will create a new partition if the target static partition doesn't exist.

For example:

static partitioning insert

Users can specify the value of partition while inserting the data:

INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) select_statement1 FROM from_statement;

INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 FROM from_statement;

  • PARTITION clause should contain all partition columns of this table.
  • The fields returned in this select statement should not contain any of the partition columns.
  • INSERT OVERWRITE will overwrite any existing data in the table or partition
  • INSERT INTO will append to the table or partition, keep the existing data intact
  • Both INSERT INTO and INSERT OVERWRITE will create a new partition if the target static partition doesn't exist.

For example:

INSERT INTO TABLE country_page_view PARTITION (date=’2019-8-30’, country=’china’) SELECT user, cnt FROM country_page_view_source;

This will create a new partition in country_page_view and insert all date from country_page_view_source to this partition. User can verify it by command:

➜ ~ SHOW PARTITIONS country_page_view;

date=’2019-8-30’,country=’china’

dynamic partitioning insert

In the dynamic partition inserts, users can give partial partition specifications, which means just specifying partial column values in the PARTITION clause or not provide PARTITION clause. Let the engine dynamically determine the partitions based on the values of the partition column from source table. This means that the dynamic partition creation is determined by the value of the input column.

INSERT INTO TABLE country_page_view SELECT user, cnt, date, country INSERT INTO TABLE country_page_view PARTITION (date=’2019-8-30’, country=’china’) SELECT user, cnt FROM country_page_view_source;

This will create a new partition in country_page_view and insert all date from country_page_view_source to this partition. User can verify it by command:

➜ ~ SHOW PARTITIONS country_page_view;

date=’2019-8-30’,country=’china’

dynamic partitioning insert

In this method, the engine will determine the different unique values from source table that the partition columns holds(i.e date and country), and creates partitions for each value.

Different from hive 2.X or smaller: Dynamic partitioned columns do not need to be on partition clause. (Hive 3.0.0 also support this in HIVE-19083).

Hive 2.X, user need define dynamic partition columns in PARTITION clause like this: 

INSERT INTO TABLE country_page_view PARTITION (date, country) SELECT user, cnt, date, country FROM country_page_view_source;

Partially specified partition columns values are also supported:In the dynamic partition inserts, users can give partial partition specifications, which means just specifying partial column values in the PARTITION clause or not provide PARTITION clause. Let the engine dynamically determine the partitions based on the values of the partition column from source table. This means that the dynamic partition creation is determined by the value of the input column.

INSERT INTO TABLE country_page_view PARTITION (date=’2019-8-30’) SELECT user, cnt, date, country FROM country_page_view_source;

In this method, the engine will determine the different unique values from source table that the partition columns holds(i.e date and country), and creates partitions for each value.

Different from hive 2.X or smaller: Dynamic partitioned columns do not need to be on partition clause. (Hive 3.0.0 also support this in HIVE-19083).

Hive 2.X, user need define dynamic partition columns in PARTITION clause like this: 

INSERT INTO TABLE country_page_view PARTITION (date, country) SELECT user, cnt, date, country FROM country_page_view_source;

Partially specified partition columns values are also supported:

INSERT INTO TABLE country_page_view PARTITION (date=’2019-8-30’) SELECT user, cnt, country FROM country_page_view_source;

NOTE:

  • The dynamic partition columns must be specified last among the columns in the SELECT statement
  • The dynamic partition columns must be in the same order in which they appear in the DDL of CREATE TABLE.
  • Because of the existence of dynamic partitioning, we will stuff both static and dynamic columns into Row, so the data received by the sink contains all partition columns.

Behavior of dynamic partition INSERT OVERWRITE:

  • delete all partition directories that match the static partition values provided in the insert statement. (spark behavior)
  • only delete partition directories which have data written into it (hive behavior)

This is related to implementation, recommend hive’s behavior.

external partitioned tables

If we already have partition data on File system, if we want to load it into Flink catalog. At this point, we need to add partition grammar.

Consider we have a table country_page_view, it is a file table and its location is ‘/user/flink/country_page_view’. And now we have some data of partition (2019-8-30, china), we want to load it into Flink catalog, we can do:

  • File system operation: move data to ‘/user/flink/country_page_view/2019-8-30/china/’
  • ALTER TABLE country_page_view ADD PARTITION (date=’2019-8-30’, country=’china’);

NOTE: Using external partitioning tables is an option. Files in File system can also be loaded into managed non-partitioned tables, from which the date can be inserted into partitioned tables. But by external partitioning tables, user can avoid reading and writing real data, which can greatly improve performance.

Partition Read

Partition pruning

One of the great significance of Partition is to support partition Pruning. Users can specify the partition to read through standard filtering conditions, which can greatly improve the efficiency of reading.

Current blink partition pruning:

FLINK-5859 FLINK-12805 FLINK-13115 already introduce PartitionableTableSource to flink and implement it in blink planner.

Advantages and disadvantages:

  • The engine will automatically prune the partitions based on the filters and partition columns. Source don’t need do something.
  • The table source need get all partition values.
  • The problem is that every partition Pruning needs to get all partition values. When there are thousands of partitions, there will be a lot of pressure on catalog (for example, MySQL storage).

How to do partition pruning depends on table: 

  • The table is catalog table: planner will use catalog to do partition pruning.
  • The table is temporary table: planner will list all partitions to do the filter by name.

Add Catalog Api:

List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters)

Without Partition pruning

If it is a partition catalog table, will read all partition which is registered to catalog. Users can judge which partition by the partition column in data.

Partition write

Static Partition

Static partition writing is basically the same as non-partitioned writing. The only difference is that the directory of the final file needs to contain a subdirectory of the partition.

Dynamic Partition

We have already talked about the grammar of dynamic partitioning, and this time we will focus on its implementation and its impact on the sink interface.

Now there are two writing formats:

  • Writing buffers is small: Like Csv/Text, In this case, when dynamic partitioning, we can write multiple files simultaneously in a task of sink.
  • Writing buffers is big: Like Orc/Parquet, In this case, when dynamic partitioning, we can not write multiple files simultaneously in a task of sink. Otherwise, too much memory will lead to OOM.

public interface PartitionableTableSink {

  List<String> getPartitionFieldNames();

  // set the static partition into the TableSink.

  void setStaticPartition(Map<String, String> partitions);

  // If returns true, sink can trust all records will definitely be grouped by partition fields before consumed by the sink, sink can use “grouped multi-partition writer”. If returns false, there are no need to do partition grouping.

  // If never invoke this method, that mean the execution mode(streaming mode) don’t support grouping, the sink should use its “ungrouped multi-partition writer” when there are dynamic partitions.

  boolean enableDynamicPartitionGrouping();

}

Sink implementation should provide three writers:

  • single-partition writer: writes data to a single partition (non-dynamic-partition writes).
  • grouped multi-partition writer: inputs are grouped by dynamic partitions, So there's only one partition at the same time.
  • ungrouped multi-partition writer: writing multiple partitions at the same time consumes more memory.

Streaming partition write

Scenes

There are many scenarios where data can be written to FileSink through streaming job. At the same time, these data can be analyzed and calculated by batch job.

  • static partition writing to sink.
  • dynamic partition writing
    • partitioned by window time, maybe event time or processing time. Without trigger, the partition column is monotonically incremental.
    • partitioned by regular columns.

Exactly-once semantics

Like StreamingFileSink, table sink should integrated with the checkpointing mechanism to provide exactly once semantics.

The files can be in one of three states: in-progress, pending or finished. The file that is currently being written to is in-progress. Once a file is closed for writing it becomes pending. When a checkpoint is successful the currently pending files will be moved to finished.

StreamingFileSink does many great works:

  • Decouple checkpoint from file size. It provides an abstraction of RollingPolicy to determine file size. On snapshot, it will not only store the pending files, but also store in-progress files. In case of a failure, it will restore the pending files, and restore in-progress files too. (In-progress files will be truncated to discard the content that does not belong to that checkpoint. This is achieved by using RecoverableWriter.)

temp files and renaming versus recoverable writer:

  • Either way, file visibility still depends on the checkpoint finish time.
  • Complex Formats, such as hive, can hardly meet the requirements for recoverable writer. (Hive just provides abstract RecordWriter, which hardly supports above features: Flush to the file system and record its file offset on snapshot, and  truncate redundant file contents on recovery)

To simplify the current implementation, we only consider that file size depends on checkpoint.

  1. snapshotState(cpId): The file currently being written changes from in-progress state to pending state. Store the pending files (Contains all unfinished checkpoints corresponding files) by operator state.
  2. notifyCheckpointComplete(cpId): Move all the pending files less than or equal to cpId to the target directory, and the corresponding files will be finished.
    1. HiveFormat's problem: At this stage, HiveFormat needs to access Metastore if the file needs to be visible. Only the Task side can have logic in notifyCheckpointComplete, which will lead to distributed access to Metastore, causing pressure.
  3. initializeState(retore): Copy the pending files from state to memory.

Partition support

Stream write support both static partition table and dynamic partition table. To static partition table is simple: just like regular table. The only thing is decide path by static partition first.

To dynamic partition table:

  • partitioned by monotonically column (like partitioned by window time): In this case, the implementation should be the same as batch grouped multi-partition writer. At the same time, can open only one writer.
  • partitioned by regular columns, Because in the case of streaming, upstream can not sort all data, so:
    • Open multiple writers at the same time, If the file format is CSV or text or partition number is small, this is no problem. If it's a Parquet or Orc data format, it will consume too much memory.
    • (Nice to have) Accumulate data in a single checkpoint, wait until snapshot, sort all data, and write partition data one by one.

FileSystemSink

Considering the stream writing and the mechanism of dynamic partitioning, we need to implement a FileSink to handle the relevant logic. Subsequent Flink file-related connectors and HiveSink can be unified into this sink. Formats only need to implement the relevant interface, without dealing with streaming exactly-once and partition-related logic.

  • Support single-partition writing
  • Support grouped multi-partition writing
  • Support non-grouped multi-partition writing
  • StreamingFileSystemSink support streaming exactly-once

Not recommend using StreamingFileSink to support partitioning in Table.

  • The bucket concept and SQL's bucket concept are in serious conflict.
  • In table, we need support single-partition writing, grouped multi-partition writing, non-grouped multi-partition writing.
  • We need a global role to commit files to metastore.
  • We need an abstraction to support both streaming and batch mode
  • Table partition is simpler than StreamingFileSink, the concept of partitioning is that we only support partition references on fields, rather than being as flexible as runtime.

Flink FileSystem connector

The DDL can like this:

CREATE TABLE USER_T (

  a INT,

  b STRING,

  c DOUBLE)

PARTITIONED BY (date STRING, country STRING)

WITH (

  'connector.type' = ‘filesystem’,

  'connector.path' = 'hdfs:///tmp/xxx',

  'format.type' = 'csv',

  'update-mode' = 'append',

  'partition-support' = 'true'

 )

The only difference from the previous FileSystem is that the partition-support attribute is required. We can use this identifier to represent the new connector support partition without changing the previous connector.Other attributes can be completely consistent.

And provide table factories:

  • Provide FileSystemTableFactory: CsvFileSystemTableFactory and HiveFileSystemTableFactory will extend it.
  • Provide FileSystemTableSink and FileSystemTableSource
  • Provide BatchFileSystemSink and StreamingFileSystemSink

Formats just needs to implement:

  • InputFormat for read
  • RecordWriter and FileCommitter to write.

Specific implementation format does not involve too much partition concept, it only manages its own reading and writing.

NOTE:

  • The dynamic partition columns must be specified last among the columns in the SELECT statement
  • The dynamic partition columns must be in the same order in which they appear in the DDL of CREATE TABLE.
  • Because of the existence of dynamic partitioning, we will stuff both static and dynamic columns into Row, so the data received by the sink contains all partition columns.

Behavior of dynamic partition INSERT OVERWRITE:

  • delete all partition directories that match the static partition values provided in the insert statement. (spark behavior)
  • only delete partition directories which have data written into it (hive behavior)

This is related to implementation, recommend hive’s behavior.

external partitioned tables

If we already have partition data on File system, if we want to load it into Flink catalog. At this point, we need to add partition grammar.

Consider we have a table country_page_view, it is a file table and its location is ‘/user/flink/country_page_view’. And now we have some data of partition (2019-8-30, china), we want to load it into Flink catalog, we can do:

  • File system operation: move data to ‘/user/flink/country_page_view/2019-8-30/china/’
  • ALTER TABLE country_page_view ADD PARTITION (date=’2019-8-30’, country=’china’);

NOTE: Using external partitioning tables is an option. Files in File system can also be loaded into managed non-partitioned tables, from which the date can be inserted into partitioned tables. But by external partitioning tables, user can avoid reading and writing real data, which can greatly improve performance.

Partition Read

Partition pruning

One of the great significance of Partition is to support partition Pruning. Users can specify the partition to read through standard filtering conditions, which can greatly improve the efficiency of reading.

Current blink partition pruning:

FLINK-5859 FLINK-12805 FLINK-13115 already introduce PartitionableTableSource to flink and implement it in blink planner.

Advantages and disadvantages:

  • The engine will automatically prune the partitions based on the filters and partition columns. Source don’t need do something.
  • The table source need get all partition values.
  • The problem is that every partition Pruning needs to get all partition values. When there are thousands of partitions, there will be a lot of pressure on catalog (for example, MySQL storage).

How to do partition pruning depends entirely on TableSource's own implementation:

  • The table source can use catalog to do partition pruning. For example, hive table source can touch its catalog from creation of HiveTableFactory.
  • Without catalog, the table source will list sub directories to do the filter by name.

How to do partition pruning depends on table: 

  • The table is catalog table: planner will use catalog to do partition pruning.
  • The table is temporary table: planner will use the all_partitions returned by the temporary table and do the filter by name.

Add Catalog Api:

List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters)

Without Partition pruning

If it is a partition catalog table, will read all partition which is registered to catalog. Users can judge which partition by the partition column in data.

Partition write

Static Partition

Static partition writing is basically the same as non-partitioned writing. The only difference is that the directory of the final file needs to contain a subdirectory of the partition.

Dynamic Partition

We have already talked about the grammar of dynamic partitioning, and this time we will focus on its implementation and its impact on the sink interface.

Now there are two writing formats:

  • Writing buffers is small: Like Csv/Text, In this case, when dynamic partitioning, we can write multiple files simultaneously in a task of sink.
  • Writing buffers is big: Like Orc/Parquet, In this case, when dynamic partitioning, we can not write multiple files simultaneously in a task of sink. Otherwise, too much memory will lead to OOM.

Sink implementation should provide three writers:

  • single-partition writer: writes data to a single partition (non-dynamic-partition writes).
  • grouped multi-partition writer: inputs are grouped by dynamic partitions, So there's only one partition at the same time.
  • ungrouped multi-partition writer: writing multiple partitions at the same time consumes more memory.

Streaming partition write

Scenes

There are many scenarios where data can be written to FileSink through streaming job. At the same time, these data can be analyzed and calculated by batch job.

  • static partition writing to sink.
  • dynamic partition writing
    • partitioned by window time, maybe event time or processing time. Without trigger, the partition column is monotonically incremental.
    • partitioned by regular columns.

Exactly-once semantics

Like StreamingFileSink, table sink should integrated with the checkpointing mechanism to provide exactly once semantics.

The files can be in one of three states: in-progress, pending or finished. The file that is currently being written to is in-progress. Once a file is closed for writing it becomes pending. When a checkpoint is successful the currently pending files will be moved to finished.

StreamingFileSink does many great works:

  • Decouple checkpoint from file size. It provides an abstraction of RollingPolicy to determine file size. On snapshot, it will not only store the pending files, but also store in-progress files. In case of a failure, it will restore the pending files, and restore in-progress files too. (In-progress files will be truncated to discard the content that does not belong to that checkpoint. This is achieved by using RecoverableWriter.)

temp files and renaming versus recoverable writer:

  • Either way, file visibility still depends on the checkpoint finish time.
  • Complex Formats, such as hive, can hardly meet the requirements for recoverable writer. (Hive just provides abstract RecordWriter, which hardly supports above features: Flush to the file system and record its file offset on snapshot, and  truncate redundant file contents on recovery)

To simplify the current implementation, we only consider that file size depends on checkpoint.

  1. snapshotState(cpId): The file currently being written changes from in-progress state to pending state. Store the pending files (Contains all unfinished checkpoints corresponding files) by operator state.
  2. notifyCheckpointComplete(cpId): Move all the pending files less than or equal to cpId to the target directory, and the corresponding files will be finished.
    1. HiveFormat's problem: At this stage, HiveFormat needs to access Metastore if the file needs to be visible. Only the Task side can have logic in notifyCheckpointComplete, which will lead to distributed access to Metastore, causing pressure.
  3. initializeState(retore): Copy the pending files from state to memory.

Partition support

Stream write support both static partition table and dynamic partition table. To static partition table is simple: just like regular table. The only thing is decide path by static partition first.

To dynamic partition table:

  • partitioned by monotonically column (like partitioned by window time): In this case, the implementation should be the same as batch grouped multi-partition writer. At the same time, can open only one writer.
  • partitioned by regular columns, Because in the case of streaming, upstream can not sort all data, so:
    • Open multiple writers at the same time, If the file format is CSV or text or partition number is small, this is no problem. If it's a Parquet or Orc data format, it will consume too much memory.
    • (Nice to have) Accumulate data in a single checkpoint, wait until snapshot, sort all data, and write partition data one by one.

FileSystemSink

Considering the stream writing and the mechanism of dynamic partitioning, we need to implement a FileSink to handle the relevant logic. Subsequent Flink file-related connectors and HiveSink can be unified into this sink. Formats only need to implement the relevant interface, without dealing with streaming exactly-once and partition-related logic.

  • Support single-partition writing
  • Support grouped multi-partition writing
  • Support non-grouped multi-partition writing
  • StreamingFileSystemSink support streaming exactly-once

Not recommend using StreamingFileSink to support partitioning in Table.

  • The bucket concept and SQL's bucket concept are in serious conflict.
  • In table, we need support single-partition writing, grouped multi-partition writing, non-grouped multi-partition writing.
  • We need a global role to commit files to metastore.
  • We need an abstraction to support both streaming and batch mode
  • Table partition is simpler than StreamingFileSink, the concept of partitioning is that we only support partition references on fields, rather than being as flexible as runtime.

Flink FileSystem connector

The DDL can like this:

CREATE TABLE USER_T

......

WITH (

  'connector.type' = ‘filesystem’,

  'connector.path' = 'hdfs:///tmp/xxx',

  'format.type' = 'csv',

  'update-mode' = 'append',

  'partition-support' = 'true'

 )

The only difference from the previous FileSystem is that the partition-support attribute is required. We can use this identifier to represent the new connector support partition without changing the previous connector.Other attributes can be completely consistent.

 'partition-support' = 'true' can be removed after we full support csv format.

And provide table factories:

  • Provide FileSystemTableFactory: Csv format and Hive format will use it.
  • Provide FileSystemTableSink and FileSystemTableSource
  • Provide BatchFileSystemSink and StreamingFileSystemSink

Formats just needs to implement:

  • InputFormat for read
  • RecordWriter and FileCommitter to write.

Specific implementation format does not involve too much partition concept, it only manages its own reading and writing.

Code Code prototype: https://github.com/JingsongLi/flink/tree/filesink/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sink/filesystem

...

  • First, planner should support statistics of catalog table.
  • Planner should read partition statistics and update to query optimizer.
  • Related: FilterableTableSource need update statistics too.

Public Interfaces

DDL

CREATE TABLE country_page_view(

  user STRING,

  cnt INT,

  date STRING,

  country STRING) 

PARTITIONED BY (date, country);

The table will be partitioned by two fields.

DML

static partition writing:

INSERT INTO | OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) select_statement1 FROM from_statement;

dynamic partition writing:

INSERT INTO | OVERWRITE TABLE tablename1 select_statement1 FROM from_statement;

If no specific partition value is specified, or less specified, it is dynamic partition writing.

alter partitions

ALTER TABLE table_name ADD PARTITION partition_spec [, PARTITION partition_spec];

ALTER TABLE table_name PARTITION partition_spec RENAME TO PARTITION partition_spec;

-- Move partition from table_name_1 to table_name_2

ALTER TABLE table_name_2 EXCHANGE PARTITION (partition_spec) WITH TABLE table_name_1;

-- multiple partitions

 ALTER TABLE table_name_2 EXCHANGE PARTITION (partition_spec, partition_spec2, ...) WITH TABLE table_name_1;

ALTER TABLE table_name DROP PARTITION partition_spec[, PARTITION partition_spec, ...]

Recover Partitions (MSCK REPAIR TABLE)

Flink stores a list of partitions for each table in its catalog. If, however, new partitions are directly added to HDFS (say by using hadoop fs -put command) or removed from HDFS, the catalog will not be aware of these changes to partition information unless the user runs ALTER TABLE table_name ADD/DROP PARTITION commands on each of the newly added or removed partitions, respectively.[3]

However, users can run a command with the repair table option:

MSCK REPAIR TABLE table_name;

which will update catalog about partitions for partitions for which such catalog doesn't already exist. The default option for MSC command is ADD PARTITIONS. With this option, it will add any partitions that exist on HDFS.

Show

SHOW PARTITIONS lists all the existing partitions for a given base table. Partitions are listed in alphabetical order.

SHOW PARTITIONS table_name;

It is also possible to specify parts of a partition specification to filter the resulting list.

SHOW PARTITIONS table_name PARTITION(ds='2010-03-03', hr='12');

Nice to have:

SHOW TABLE EXTENDED [IN|FROM database_name] LIKE 'identifier_with_wildcards' [PARTITION(partition_spec)];

Describe

DESCRIBE [EXTENDED | FORMATTED] [db_name.]table_name [PARTITION partition_spec] [col_name];

DML

static partition writing:

INSERT { INTO | OVERWRITE } TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) select_statement1 FROM from_statement;

dynamic partition writing:

INSERT { INTO | OVERWRITE } TABLE tablename1 select_statement1 FROM from_statement;

If no specific partition value is specified, or less specified, it is dynamic partition writing.

alter partitions

ALTER TABLE table_name ADD PARTITION partition_spec [, PARTITION partition_spec];

ALTER TABLE table_name PARTITION partition_spec RENAME TO PARTITION partition_spec;

-- Move partition from table_name_1 to table_name_2

ALTER TABLE table_name_2 EXCHANGE PARTITION (partition_spec) WITH TABLE table_name_1;

-- multiple partitions

 ALTER TABLE table_name_2 EXCHANGE PARTITION (partition_spec, partition_spec2, ...) WITH TABLE table_name_1;

ALTER TABLE table_name DROP PARTITION partition_spec[, PARTITION partition_spec, ...]

partition_spec ::= (partition_column = partition_col_value, partition_column = partition_col_value, ...)

Show

SHOW PARTITIONS lists all the existing partitions for a given base table. Partitions are listed in alphabetical order.

SHOW PARTITIONS table_name;

It is also possible to specify parts of a partition specification to filter the resulting list.

SHOW PARTITIONS table_name PARTITION(ds='2010-03-03', hr='12');

Nice to have:

SHOW TABLE EXTENDED [IN|FROM database_name] LIKE 'identifier_with_wildcards' [PARTITION(partition_spec)];

Describe

DESCRIBE [EXTENDED | FORMATTED] [db_name.]table_name [PARTITION partition_spec] [col_name];

Catalog interface

public interface Catalog {

  …..

  void renamePartition(ObjectPath tablePath, CatalogPartitionSpec spec, CatalogPartitionSpec newSpec) throws PartitionNotExistException, PartitionAlreadyExistsException, CatalogException;

  void syncPartitions(ObjectPath tablePath) throws TableNotExistException, CatalogException;

 List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters) throws TableNotExistException, TableNotPartitionedException, CatalogException;

}

Further discussion

Create DDL

Should we support partition grammar like Spark SQL? (Subsequent votes will be taken to determine.)

CREATE TABLE country_page_view(

  user STRING,

  cnt INT) 

PARTITIONED BY (date STRING, country STRING);

The table will be partitioned by two fields.

Recover Partitions (MSCK REPAIR TABLE)

Flink stores a list of partitions for each table in its catalog. If, however, new partitions are directly added to HDFS (say by using hadoop fs -put command) or removed from HDFS, the catalog will not be aware of these changes to partition information unless the user runs ALTER TABLE table_name ADD/DROP PARTITION commands on each of the newly added or removed partitions, respectively.[3]

However, users can run a command with the repair table option:

MSCK REPAIR TABLE table_name;

which will update catalog about partitions for partitions for which such catalog doesn't already exist. The default option for MSC command is ADD PARTITIONS. With this option, it will add any partitions that exist on HDFS.

TableSink Interface

...

public interface PartitionableTableSink {

...

  void setStaticPartition(Map<String, String> partitions);

  // get dynamic partition column names.

  List<String> getDynamicPartitionFieldNames();

  // If returns true, sink can trust all records will definitely be grouped by partition fields before consumed by the sink, sink can use “grouped multi-partition writer”. If returns false, there are no need to do partition grouping.

  // If never invoke this method, that mean the execution mode(streaming mode) don’t support grouping, the sink should use its “ungrouped multi-partition writer” when there are dynamic partitions.

  boolean enableDynamicPartitionGrouping();

}

Catalog interface

public interface Catalog {

  …..

  void renamePartition(ObjectPath tablePath, CatalogPartitionSpec spec, CatalogPartitionSpec newSpec) throws PartitionNotExistException, PartitionAlreadyExistsException, CatalogException;

  void syncPartitions(ObjectPath tablePath) throws TableNotExistException, CatalogException;

// If never invoke this method, that mean the execution mode(streaming mode) don’t support grouping, the sink should use its “ungrouped multi-partition writer” when there are dynamic partitions.

  boolean enableDynamicPartitionGrouping() List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters) throws TableNotExistException, TableNotPartitionedException, CatalogException;

}

Road map

  1. Modify DDL support.
  2. Rework partition pruning
  3. Rework dynamic partitioning
  4. Introduce FileSystemTableFactoryIntroduce FileSystemTableFactory
    • Introduce BatchFileSystemSink
    • Introduce StreamingFileSystemSink
    • Introduce FileSystemTableFactory and FileSystemTableSource and FileSystemTableSource and FileSystemTableSink
    • Introduce new CSV for FileSystemTableFactory
    • Integrate Hive to FileSystemTableFactory

...