Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14249

Release1.10

Current state: Under Discussion

JIRA:

...


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

At present, the partition we want to support is similar to that of hive and only supports single value list partition.

Create Table

With hive dialect (We can set dialect in TableConfig):

CREATE TABLE country_page_view(

  user STRING,

  cnt INT) 

PARTITIONED BY (date STRING, country STRING);

The table will be partitioned by two fields.

  • We don’t need support rich partitioning criteria. Because hive bucket cover these concepts.
  • Partition column is the pseudocolumn to unify batch and streaming.
  • Such a grammar can isolate the partition columns from the regular columns and avoid confusion of user concepts. (Either way, the two may intersect to make the specific data strange.

static partitioning insert

Users can specify the value of partition while inserting the data:

INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) select_statement1 FROM from_statement;

INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 FROM from_statement;

  • PARTITION clause should contain all partition columns of this table.
  • The fields returned in this select statement should not contain any of the partition columns.
  • INSERT OVERWRITE will overwrite any existing data in the table or partition
  • INSERT INTO will append to the table or partition, keep the existing data intact
  • Both INSERT INTO and INSERT OVERWRITE will create a new partition if the target static partition doesn't exist.

For example:

static partitioning insert

Users can specify the value of partition while inserting the data:

INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) select_statement1 FROM from_statement;

INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 FROM from_statement;

  • PARTITION clause should contain all partition columns of this table.
  • The fields returned in this select statement should not contain any of the partition columns.
  • INSERT OVERWRITE will overwrite any existing data in the table or partition
  • INSERT INTO will append to the table or partition, keep the existing data intact
  • Both INSERT INTO and INSERT OVERWRITE will create a new partition if the target static partition doesn't exist.

For example:

INSERT INTO TABLE country_page_view PARTITION (date=’2019-8-30’, country=’china’) SELECT user, cnt FROM country_page_view_source;

This will create a new partition in country_page_view and insert all date from country_page_view_source to this partition. User can verify it by command:

➜ ~ SHOW PARTITIONS country_page_view;

date=’2019-8-30’,country=’china’

dynamic partitioning insert

In the dynamic partition inserts, users can give partial partition specifications, which means just specifying partial column values in the PARTITION clause or not provide PARTITION clause. Let the engine dynamically determine the partitions based on the values of the partition column from source table. This means that the dynamic partition creation is determined by the value of the input column.

INSERT INTO TABLE country_page_view SELECT user, cnt, date, country INSERT INTO TABLE country_page_view PARTITION (date=’2019-8-30’, country=’china’) SELECT user, cnt FROM country_page_view_source;

This will create a new partition in country_page_view and insert all date from country_page_view_source to this partition. User can verify it by command:

➜ ~ SHOW PARTITIONS country_page_view;

date=’2019-8-30’,country=’china’

dynamic partitioning insert

In this method, the engine will determine the different unique values from source table that the partition columns holds(i.e date and country), and creates partitions for each value.

Different from hive 2.X or smaller: Dynamic partitioned columns do not need to be on partition clause. (Hive 3.0.0 also support this in HIVE-19083).

Hive 2.X, user need define dynamic partition columns in PARTITION clause like this: 

INSERT INTO TABLE country_page_view PARTITION (date, country) SELECT user, cnt, date, country FROM country_page_view_source;

Partially specified partition columns values are also supported:In the dynamic partition inserts, users can give partial partition specifications, which means just specifying partial column values in the PARTITION clause or not provide PARTITION clause. Let the engine dynamically determine the partitions based on the values of the partition column from source table. This means that the dynamic partition creation is determined by the value of the input column.

INSERT INTO TABLE country_page_view PARTITION (date=’2019-8-30’) SELECT user, cnt, date, country FROM country_page_view_source;

In this method, the engine will determine the different unique values from source table that the partition columns holds(i.e date and country), and creates partitions for each value.

Different from hive 2.X or smaller: Dynamic partitioned columns do not need to be on partition clause. (Hive 3.0.0 also support this in HIVE-19083).

Hive 2.X, user need define dynamic partition columns in PARTITION clause like this: 

INSERT INTO TABLE country_page_view PARTITION (date, country) SELECT user, cnt, date, country FROM country_page_view_source;

Partially specified partition columns values are also supported:

INSERT INTO TABLE country_page_view PARTITION (date=’2019-8-30’) SELECT user, cnt, country FROM country_page_view_source;

NOTE:

  • The dynamic partition columns must be specified last among the columns in the SELECT statement
  • The dynamic partition columns must be in the same order in which they appear in the DDL of CREATE TABLE.
  • Because of the existence of dynamic partitioning, we will stuff both static and dynamic columns into Row, so the data received by the sink contains all partition columns.

Behavior of dynamic partition INSERT OVERWRITE:

  • delete all partition directories that match the static partition values provided in the insert statement. (spark behavior)
  • only delete partition directories which have data written into it (hive behavior)

This is related to implementation, recommend hive’s behavior.

external partitioned tables

If we already have partition data on File system, if we want to load it into Flink catalog. At this point, we need to add partition grammar.

...

NOTE:

  • The dynamic partition columns must be specified last among the columns in the SELECT statement
  • The dynamic partition columns must be in the same order in which they appear in the DDL of CREATE TABLE.
  • Because of the existence of dynamic partitioning, we will stuff both static and dynamic columns into Row, so the data received by the sink contains all partition columns.

Behavior of dynamic partition INSERT OVERWRITE:

  • delete all partition directories that match the static partition values provided in the insert statement. (spark behavior)
  • only delete partition directories which have data written into it (hive behavior)

This is related to implementation, recommend hive’s behavior.

external partitioned tables

If we already have partition data on File system, if we want to load it into Flink catalog. At this point, we need to add partition grammar.

Consider we have a table country_page_view, it is a file table and its location is ‘/user/flink/country_page_view’. And now we have some data of partition (2019-8-30, china), we want to load it into Flink catalog, we can do:

  • File system operation: move data to ‘/user/flink/country_page_

...

  • File system operation: move data to ‘/user/flink/country_page_view/2019-8-30/china/’
  • ALTER TABLE country_page_view ADD PARTITION (date=’2019-8-30’, country=’china’);

...

Flink FileSystem connector

The DDL can like this (With hive dialect):

CREATE TABLE USER_T (

  a INT,

  b STRING,

  c DOUBLE)

PARTITIONED BY (date STRING, country STRING)

......

WITH (

  'connector.type' = ‘filesystem’,

...

  • First, planner should support statistics of catalog table.
  • Planner should read partition statistics and update to query optimizer.
  • Related: FilterableTableSource need update statistics too.

Public Interfaces

DDL

CREATE TABLE country_page_view(

  user STRING,

  cnt INT) 

PARTITIONED BY (date STRING, country STRING);

The table will be partitioned by two fields.

DML

static partition writing:

INSERT { INTO | OVERWRITE } TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) select_statement1 FROM from_statement;

dynamic partition writing:

INSERT { INTO | OVERWRITE } TABLE tablename1 select_statement1 FROM from_statement;

If no specific partition value is specified, or less specified, it is dynamic partition writing.

alter partitions

ALTER TABLE table_name ADD PARTITION partition_spec [, PARTITION partition_spec];

ALTER TABLE table_name PARTITION partition_spec RENAME TO PARTITION partition_spec;

-- Move partition from table_name_1 to table_name_2

ALTER TABLE table_name_2 EXCHANGE PARTITION (partition_spec) WITH TABLE table_name_1;

-- multiple partitions

 ALTER TABLE table_name_2 EXCHANGE PARTITION (partition_spec, partition_spec2, ...) WITH TABLE table_name_1;

ALTER TABLE table_name DROP PARTITION partition_spec[, PARTITION partition_spec, ...]

partition_spec ::= (partition_column = partition_col_value, partition_column = partition_col_value, ...)

Recover Partitions (MSCK REPAIR TABLE)

Flink stores a list of partitions for each table in its catalog. If, however, new partitions are directly added to HDFS (say by using hadoop fs -put command) or removed from HDFS, the catalog will not be aware of these changes to partition information unless the user runs ALTER TABLE table_name ADD/DROP PARTITION commands on each of the newly added or removed partitions, respectively.[3]

However, users can run a command with the repair table option:

MSCK REPAIR TABLE table_name;

which will update catalog about partitions for partitions for which such catalog doesn't already exist. The default option for MSC command is ADD PARTITIONS. With this option, it will add any partitions that exist on HDFS.

Show

SHOW PARTITIONS lists all the existing partitions for a given base table. Partitions are listed in alphabetical order.

SHOW PARTITIONS table_name;

It is also possible to specify parts of a partition specification to filter the resulting list.

SHOW PARTITIONS table_name PARTITION(ds='2010-03-03', hr='12');

Nice to have:

SHOW TABLE EXTENDED [IN|FROM database_name] LIKE 'identifier_with_wildcards' [PARTITION(partition_spec)];

Describe

DESCRIBE [EXTENDED | FORMATTED] [db_name.]table_name [PARTITION partition_spec] [col_name];

Catalog interface

public interface Catalog {

  …..

  void renamePartition(ObjectPath tablePath, CatalogPartitionSpec spec, CatalogPartitionSpec newSpec) throws PartitionNotExistException, PartitionAlreadyExistsException, CatalogException;

  void syncPartitions(ObjectPath tablePath) throws TableNotExistException, CatalogException;

 List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters) throws TableNotExistException, TableNotPartitionedException, CatalogException;

}

Further discussion

Create DDL

DML

static partition writing:

INSERT { INTO | OVERWRITE } TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) select_statement1 FROM from_statement;

dynamic partition writing:

INSERT { INTO | OVERWRITE } TABLE tablename1 select_statement1 FROM from_statement;

If no specific partition value is specified, or less specified, it is dynamic partition writing.

alter partitions

ALTER TABLE table_name ADD PARTITION partition_spec [, PARTITION partition_spec];

ALTER TABLE table_name PARTITION partition_spec RENAME TO PARTITION partition_spec;

-- Move partition from table_name_1 to table_name_2

ALTER TABLE table_name_2 EXCHANGE PARTITION (partition_spec) WITH TABLE table_name_1;

-- multiple partitions

 ALTER TABLE table_name_2 EXCHANGE PARTITION (partition_spec, partition_spec2, ...) WITH TABLE table_name_1;

ALTER TABLE table_name DROP PARTITION partition_spec[, PARTITION partition_spec, ...]

partition_spec ::= (partition_column = partition_col_value, partition_column = partition_col_value, ...)

Show

SHOW PARTITIONS lists all the existing partitions for a given base table. Partitions are listed in alphabetical order.

SHOW PARTITIONS table_name;

It is also possible to specify parts of a partition specification to filter the resulting list.

SHOW PARTITIONS table_name PARTITION(ds='2010-03-03', hr='12');

Nice to have:

SHOW TABLE EXTENDED [IN|FROM database_name] LIKE 'identifier_with_wildcards' [PARTITION(partition_spec)];

Describe

DESCRIBE [EXTENDED | FORMATTED] [db_name.]table_name [PARTITION partition_spec] [col_name];

Catalog interface

public interface Catalog {

  …..

  void renamePartition(ObjectPath tablePath, CatalogPartitionSpec spec, CatalogPartitionSpec newSpec) throws PartitionNotExistException, PartitionAlreadyExistsException, CatalogException;

  void syncPartitions(ObjectPath tablePath) throws TableNotExistException, CatalogException;

 List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters) throws TableNotExistException, TableNotPartitionedException, CatalogException;

}

Further discussion

Create DDL

Should we support partition grammar like Spark SQL? (Subsequent votes will be taken to determine.)

CREATE TABLE country_page_view(

  user STRING,

  cnt INT) 

PARTITIONED BY (date STRING, country STRING);

The table will be partitioned by two fields.

Recover Partitions (MSCK REPAIR TABLE)

Flink stores a list of partitions for each table in its catalog. If, however, new partitions are directly added to HDFS (say by using hadoop fs -put command) or removed from HDFS, the catalog will not be aware of these changes to partition information unless the user runs ALTER TABLE table_name ADD/DROP PARTITION commands on each of the newly added or removed partitions, respectively.[3]

However, users can run a command with the repair table option:

MSCK REPAIR TABLE table_name;

which will update catalog about partitions for partitions for which such catalog doesn't already exist. The default option for MSC command is ADD PARTITIONS. With this option, it will add any partitions that exist on HDFS.Should we support partition grammar like Spark SQL? (Subsequent votes will be taken to determine.)

TableSink Interface

public interface PartitionableTableSink {

...