Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties


Discussion

...

thread
Vote thread
JIRA

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14249

...

Release1.10


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

At present, the partition we want to support is similar to that of hive and only supports single value list partition.

Create Table

With hive dialect (We can set dialect in TableConfig):

CREATE TABLE country_page_view(

  user STRING,

  cnt INT) 

PARTITIONED BY (date STRING, country STRING);

The table will be partitioned by two fields.

  • We don’t need support rich partitioning criteria. Because hive bucket cover these concepts.
  • Partition column is the pseudocolumn to unify batch and streaming.
  • Such a grammar can isolate the partition columns from the regular columns and avoid confusion of user concepts. (Either way, the two may intersect to make the specific data strange.

static partitioning insert

Users can specify the value of partition while inserting the data:

INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) select_statement1 FROM from_statement;

INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 FROM from_statement;

  • PARTITION clause should contain all partition columns of this table.
  • The fields returned in this select statement should not contain any of the partition columns.
  • INSERT OVERWRITE will overwrite any existing data in the table or partition
  • INSERT INTO will append to the table or partition, keep the existing data intact
  • Both INSERT INTO and INSERT OVERWRITE will create a new partition if the target static partition doesn't exist.

For example:

static partitioning insert

Users can specify the value of partition while inserting the data:

INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) select_statement1 FROM from_statement;

INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 FROM from_statement;

  • PARTITION clause should contain all partition columns of this table.
  • The fields returned in this select statement should not contain any of the partition columns.
  • INSERT OVERWRITE will overwrite any existing data in the table or partition
  • INSERT INTO will append to the table or partition, keep the existing data intact
  • Both INSERT INTO and INSERT OVERWRITE will create a new partition if the target static partition doesn't exist.

For example:

INSERT INTO TABLE country_page_view PARTITION (date=’2019-8-30’, country=’china’) SELECT user, cnt FROM country_page_view_source;

This will create a new partition in country_page_view and insert all date from country_page_view_source to this partition. User can verify it by command:

➜ ~ SHOW PARTITIONS country_page_view;

INSERT INTO TABLE country_page_view PARTITION (date=’2019-8-30’,country=’china’) SELECT user, cnt FROM country_page_view_source;

This will create a new partition in country_page_view and insert all date from country_page_view_source to this partition. User can verify it by command:

➜ ~ SHOW PARTITIONS country_page_view;

date=’2019-8-30’,country=’china’

dynamic partitioning insert

dynamic partitioning insert

In the dynamic partition inserts, users can give partial partition specifications, which means just specifying partial column values in the PARTITION clause or not provide PARTITION clause. Let the In the dynamic partition inserts, users can give partial partition specifications, which means just specifying partial column values in the PARTITION clause or not provide PARTITION clause. Let the engine dynamically determine the partitions based on the values of the partition column from source table. This means that the dynamic partition creation is determined by the value of the input column.

...

Flink FileSystem connector

The DDL can like this (With hive dialect)::

CREATE TABLE USER_T (

  a INT,

  b STRING,

  c DOUBLE)

PARTITIONED BY (date STRING, country STRING)

......

WITH (

  'connector.type' = ‘filesystem’,

...

  • First, planner should support statistics of catalog table.
  • Planner should read partition statistics and update to query optimizer.
  • Related: FilterableTableSource need update statistics too.

Public Interfaces

DDL

CREATE TABLE country_page_view(

  user STRING,

  cnt INT) 

PARTITIONED BY (date STRING, country STRING);

The table will be partitioned by two fields.

DML

static partition writing:

DML

static partition writing:

INSERT { INTO | OVERWRITE } TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...INSERT { INTO | OVERWRITE } TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) select_statement1 FROM from_statement;

...

partition_spec ::= (partition_column = partition_col_value, partition_column = partition_col_value, ...)

Recover Partitions (MSCK REPAIR TABLE)

Flink stores a list of partitions for each table in its catalog. If, however, new partitions are directly added to HDFS (say by using hadoop fs -put command) or removed from HDFS, the catalog will not be aware of these changes to partition information unless the user runs ALTER TABLE table_name ADD/DROP PARTITION commands on each of the newly added or removed partitions, respectively.[3]

However, users can run a command with the repair table option:

MSCK REPAIR TABLE table_name;

which will update catalog about partitions for partitions for which such catalog doesn't already exist. The default option for MSC command is ADD PARTITIONS. With this option, it will add any partitions that exist on HDFS.

Show

SHOW PARTITIONS lists all the existing partitions for a given base table. Partitions are listed in alphabetical order.

SHOW PARTITIONS table_name;

It is also possible to specify parts of a partition specification to filter the resulting list.

SHOW PARTITIONS table_name PARTITION(ds='2010-03-03', hr='12');

Nice to have:

SHOW TABLE EXTENDED [IN|FROM database_name] LIKE 'identifier_with_wildcards' [PARTITION(partition_spec)];

Describe

DESCRIBE [EXTENDED | FORMATTED] [db_name.]table_name [PARTITION partition_spec] [col_name];

Catalog interface

public interface Catalog {

  …..

  void renamePartition(ObjectPath tablePath, CatalogPartitionSpec spec, CatalogPartitionSpec newSpec) throws PartitionNotExistException, PartitionAlreadyExistsException, CatalogException;

  void syncPartitions(ObjectPath tablePath) throws TableNotExistException, CatalogException;

 List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters) throws TableNotExistException, TableNotPartitionedException, CatalogException;

}

Further discussion

Create DDL

Show

SHOW PARTITIONS lists all the existing partitions for a given base table. Partitions are listed in alphabetical order.

SHOW PARTITIONS table_name;

It is also possible to specify parts of a partition specification to filter the resulting list.

SHOW PARTITIONS table_name PARTITION(ds='2010-03-03', hr='12');

Nice to have:

SHOW TABLE EXTENDED [IN|FROM database_name] LIKE 'identifier_with_wildcards' [PARTITION(partition_spec)];

Describe

DESCRIBE [EXTENDED | FORMATTED] [db_name.]table_name [PARTITION partition_spec] [col_name];

Catalog interface

public interface Catalog {

  …..

  void renamePartition(ObjectPath tablePath, CatalogPartitionSpec spec, CatalogPartitionSpec newSpec) throws PartitionNotExistException, PartitionAlreadyExistsException, CatalogException;

  void syncPartitions(ObjectPath tablePath) throws TableNotExistException, CatalogException;

 List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters) throws TableNotExistException, TableNotPartitionedException, CatalogException;

}

Further discussion

Create DDL

Should we support partition grammar like Spark SQL? (Subsequent votes will be taken to determine.)

CREATE TABLE country_page_view(

  user STRING,

  cnt INT) 

PARTITIONED BY (date STRING, country STRING);

The table will be partitioned by two fields.

Recover Partitions (MSCK REPAIR TABLE)

Flink stores a list of partitions for each table in its catalog. If, however, new partitions are directly added to HDFS (say by using hadoop fs -put command) or removed from HDFS, the catalog will not be aware of these changes to partition information unless the user runs ALTER TABLE table_name ADD/DROP PARTITION commands on each of the newly added or removed partitions, respectively.[3]

However, users can run a command with the repair table option:

MSCK REPAIR TABLE table_name;

which will update catalog about partitions for partitions for which such catalog doesn't already exist. The default option for MSC command is ADD PARTITIONS. With this option, it will add any partitions that exist on HDFS.Should we support partition grammar like Spark SQL? (Subsequent votes will be taken to determine.)

TableSink Interface

public interface PartitionableTableSink {

...