Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Although Queue may distinguish partitions by the partition concept of the underlying queue. (Like kafka partition), streaming connector like queue(Kafka) support table partition is not our goal in this ticket.
  • Bucket support to cover hash partition in traditional database and etc..

Background

Partition in traditional databases

...

INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 FROM from_statement;

  • PARTITION clause should contain all partition columns of this table.
  • The fields returned in this select statement should not contain any of the partition columns.
  • INSERT OVERWRITE will overwrite any existing data in the table or partition
  • INSERT INTO will append to the table or partition, keep the existing data intact
  • Both INSERT INTO and INSERT OVERWRITE will create a new partition if the target static partition doesn't exist.

...

In this method, the engine will determine the different unique values from source table that the partition columns holds(i.e date and country), and creates partitions for each value.

Different from hive : Flink will automatically generate partition specification 2.X or smaller: Dynamic partitioned columns do not need to be on partition clause. (Hive 3.0.0 also support this in HIVE-19083) and don’t specify .

Hive 2.X, user need define dynamic partition columns in PARTITION clause like this: 

...

INSERT INTO TABLE country_page_view PARTITION (date=’2019-8-30’) SELECT user, cnt, date country FROM country_page_view_source;

...

This is related to implementation, but now we don’t have any implementation nowrecommend hive’s behavior.

external partitioned tables

...

FLINK-5859 FLINK-12805 FLINK-13115 already introduce PartitionableTableSource to flink and implement it in blink planner. The source interface is:

public interface PartitionableTableSource {

  // get all partitions, list of partition column name to column value map.

  List<Map<String, String>> getPartitions();

  // get partition column names

  List<String> getPartitionFieldNames();

  // Applies the remaining partitions to the table source.

  TableSource applyPartitionPruning(List<Map<String, String>> remainingPartitions);

}

Advantages and disadvantages:

  • The engine will automatically prune the partitions based on the filters and partition columns. Source don’t need do something.
  • The table source need get all partition values.
  • The problem is that every partition Pruning needs to get all partition values. When there are thousands of partitions, there will be a lot of pressure on catalog (for example, MySQL storage).

New PartitionableTableSource

public interface PartitionableTableSource {

  // get partition column names.

  List<String> getPartitionFieldNames();

  // Applies the remaining partitions to the table source.

  TableSource applyPartitionPruning(List<Expression> partitionPredicates);

}

How to do partition pruning depends entirely on TableSource's own implementation:

  • The table source can use catalog to do partition pruning. For example, hive table source can touch its catalog from creation of HiveTableFactory.
  • Without catalog, the table source will list sub directories to do the filter by name.

Without Partition pruning

The data of all partitions will be read out. Users can judge which partition by the partition column in data.

Partition write

Static Partition

Static partition writing is basically the same as non-partitioned writing. The only difference is that the directory of the final file needs to contain a subdirectory of the partition.

Dynamic Partition

We have already talked about the grammar of dynamic partitioning, and this time we will focus on its implementation and its impact on the sink interface.

Now there are two writing formats:

  • Writing buffers is small: Like Csv/Text, In this case, when dynamic partitioning, we can write multiple files simultaneously in a task of sink.
  • Writing buffers is big: Like Orc/Parquet, In this case, when dynamic partitioning, we can not write multiple files simultaneously in a task of sink. Otherwise, too much memory will lead to OOM.

Introduce new PartitionableTableSink:

public interface PartitionableTableSink {

  // set the static partition into the TableSink.

  void setStaticPartition(Map<String, String> partitions);

  // get dynamic partition column names.

  List<String> getDynamicPartitionFieldNames();

  // If returns true, sink can trust all records will definitely be grouped by partition fields before consumed by the sink, sink can use “grouped multi-partition writer”. If returns false, there are no need to do partition grouping.

  // If never invoke this method, that mean the execution mode(streaming mode) don’t support grouping, the sink should use its “ungrouped multi-partition writer” when there are dynamic partitions.

  boolean enableDynamicPartitionGrouping();

}

Advantages and disadvantages:

  • The engine will automatically prune the partitions based on the filters and partition columns. Source don’t need do something.
  • The table source need get all partition values.
  • The problem is that every partition Pruning needs to get all partition values. When there are thousands of partitions, there will be a lot of pressure on catalog (for example, MySQL storage).

How to do partition pruning depends entirely on TableSource's own implementation:

  • The table source can use catalog to do partition pruning. For example, hive table source can touch its catalog from creation of HiveTableFactory.
  • Without catalog, the table source will list sub directories to do the filter by name.

How to do partition pruning depends on table: 

  • The table is catalog table: planner will use catalog to do partition pruning.
  • The table is temporary table: planner will list all partitions to do the filter by name.

Add Catalog Api:

List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters)

Without Partition pruning

If it is a partition catalog table, will read all partition which is registered to catalog. Users can judge which partition by the partition column in data.

Partition write

Static Partition

Static partition writing is basically the same as non-partitioned writing. The only difference is that the directory of the final file needs to contain a subdirectory of the partition.

Dynamic Partition

We have already talked about the grammar of dynamic partitioning, and this time we will focus on its implementation and its impact on the sink interface.

Now there are two writing formats:

  • Writing buffers is small: Like Csv/Text, In this case, when dynamic partitioning, we can write multiple files simultaneously in a task of sink.
  • Writing buffers is big: Like Orc/Parquet, In this case, when dynamic partitioning, we can not write multiple files simultaneously in a task of sink. Otherwise, too much memory will lead to OOM.

public interface PartitionableTableSink {

  List<String> getPartitionFieldNames();

  // set the static partition into the TableSink.

  void setStaticPartition(Map<String, String> partitions);

  // get dynamic partition column names.

  List<String> getDynamicPartitionFieldNames();

  // If returns true, sink can trust all records will definitely be grouped by partition fields before consumed by the sink, sink can use “grouped multi-partition writer”. If returns false, there are no need to do partition grouping.

  // If never invoke this method, that mean the execution mode(streaming mode) don’t support grouping, the sink should use its “ungrouped multi-partition writer” when there are dynamic partitions.

  boolean enableDynamicPartitionGrouping();

}

Sink implementation Sink implementation (HiveTableSink) should provide three writers:

  • single-partition writer: writes data to a single partition (non-dynamic-partition writes).
  • grouped multi-partition writer: inputs are grouped by dynamic partitions, So there's only one partition at the same time.
  • ungrouped multi-partition writer: writing multiple partitions at the same time consumes more memory.

Nice to have: Considering that dynamic partitioning implementation of all file format(hive/flink csv, parquet, orc) is almost the same. We need a file format frame to abstract these things.

Streaming partition write

Scenes

Streaming partition write

Scenes

There are many scenarios where data can be written to FileSink through streaming job. There are many scenarios where data can be written to FileSink through streaming job. At the same time, these data can be analyzed and calculated by batch job.

...

  • Support single-partition writing
  • Support grouped multi-partition writing
  • Support non-grouped multi-partition writing
  • Extended StreamingFileSystemSink support streaming exactly-once

Catalog changes

HiveConnector should only call HiveClient-related Api by catalog, and other places should call HiveCatalog. So we should add more method to cover the requirements. 

HiveCatalog

CatalogTable and CatalogPartition should cover HiveTableSource/HiveTableSink requirements (like hive StorageDescriptor).  Should add more properties to the map in CatalogPartition from HiveCatalog:

  • String location;
  • String inputFormat;
  • String outputFormat;
  • String serializationLib;
  • boolean compressed;

Partition statistics

  • First, planner should support statistics of catalog table.
  • Planner should read partition statistics and update to query optimizer.
  • Related: FilterableTableSource need update statistics too.

Now we don't have a mechanism for pruning or filterPushDown's source to update its statistics. Maybe we need to modify the related tableSource interfaces.

Catalog interface

public class Catalog {

  void renamePartition(ObjectPath tablePath, CatalogPartitionSpec spec, CatalogPartitionSpec newSpec);

 List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters) throws TableNotExistException, TableNotPartitionedException, CatalogException;

}

...

Not recommend using StreamingFileSink to support partitioning in Table.

  • The bucket concept and SQL's bucket concept are in serious conflict.
  • In table, we need support single-partition writing, grouped multi-partition writing, non-grouped multi-partition writing.
  • We need a global role to commit files to metastore.
  • We need an abstraction to support both streaming and batch mode
  • Table partition is simpler than StreamingFileSink, the concept of partitioning is that we only support partition references on fields, rather than being as flexible as runtime.

Flink FileSystem connector

The DDL can like this:

CREATE TABLE USER_T (

  a INT,

  b STRING,

  c DOUBLE)

PARTITIONED BY (date STRING, country STRING)

WITH (

  'connector.type' = ‘filesystem’,

  'connector.path' = 'hdfs:///tmp/xxx',

  'format.type' = 'csv',

  'update-mode' = 'append',

  'partition-support' = 'true'

 )

The only difference from the previous FileSystem is that the partition-support attribute is required. We can use this identifier to represent the new connector support partition without changing the previous connector.Other attributes can be completely consistent.

And provide table factories:

  • Provide FileSystemTableFactory: CsvFileSystemTableFactory and HiveFileSystemTableFactory will extend it.
  • Provide FileSystemTableSink and FileSystemTableSource
  • Provide BatchFileSystemSink and StreamingFileSystemSink

Formats just needs to implement:

  • InputFormat for read
  • RecordWriter and FileCommitter to write.

Specific implementation format does not involve too much partition concept, it only manages its own reading and writing.

Code prototype: https://github.com/JingsongLi/flink/tree/filesink/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sink/filesystem

Catalog changes

HiveCatalog

CatalogTable and CatalogPartition should cover HiveTableSource/HiveTableSink requirements (like hive StorageDescriptor).  Should add more properties to the map in CatalogPartition from HiveCatalog:

  • String location;
  • String inputFormat;
  • String outputFormat;
  • String serializationLib;
  • boolean compressed;

Partition statistics

  • First, planner should support statistics of catalog table.
  • Planner should read partition statistics and update to query optimizer.
  • Related: FilterableTableSource need update statistics too.

Public Interfaces

DDL

CREATE TABLE country_page_view(

...

The table will be partitioned by two fields.

DML

static partition writing:

INSERT INTO | OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...

DML

) select_statement1 FROM from_statement;

dynamic static partition writing:

INSERT INTO | OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) select_statement1 FROM from_statement;

dynamic partition writing:

INSERT INTO | OVERWRITE TABLE tablename1 select_statement1 FROM from_statement;

If no specific partition value is specified, or less specified, it is dynamic partition writing.

alter partitions

select_statement1 FROM from_statement;

If no specific partition value is specified, or less specified, it is dynamic partition writing.

alter partitions

ALTER TABLE table_name ADD PARTITION partition_spec [, PARTITION partition_spec];

ALTER TABLE table_name PARTITION partition_spec RENAME TO PARTITION partition_spec;

-- Move partition from table_name_1 to table_name_2

ALTER TABLE table_name_2 EXCHANGE PARTITION (partition_spec) WITH TABLE table_name_1;

-- multiple partitions

 ALTER TABLE table_name_2 EXCHANGE PARTITION (partition_spec, partition_spec2, ...) WITH TABLE table_name_1ALTER TABLE table_name ADD PARTITION partition_spec [, PARTITION partition_spec];

ALTER TABLE table_name DROP PARTITION partition_spec RENAME TO PARTITION partition_spec;

-- Move partition from table_name_1 to table_name_2

[, PARTITION partition_spec, ...]

Recover Partitions (MSCK REPAIR TABLE)

Flink stores a list of partitions for each table in its catalog. If, however, new partitions are directly added to HDFS (say by using hadoop fs -put command) or removed from HDFS, the catalog will not be aware of these changes to partition information unless the user runs ALTER TABLE table_name _2 EXCHANGE PARTITION (partition_spec) WITH TABLE table_name_1;

-- multiple partitions

 ALTER TABLE table_name_2 EXCHANGE PARTITION (partition_spec, partition_spec2, ...) WITH TABLE table_name_1;

ADD/DROP PARTITION commands on each of the newly added or removed partitions, respectively.[3]

However, users can run a command with the repair table option:

MSCK REPAIR TABLE table_name;

which will update catalog about partitions for partitions for which such catalog doesn't already exist. The default option for MSC command is ADD PARTITIONS. With this option, it will add any partitions that exist on HDFS.ALTER TABLE table_name DROP PARTITION partition_spec[, PARTITION partition_spec, ...]

Show

SHOW PARTITIONS lists all the existing partitions for a given base table. Partitions are listed in alphabetical order.

...

SHOW TABLE EXTENDED [IN|FROM database_name] LIKE 'identifier_with_wildcards' [PARTITION(partition_spec)];

Describe

DESCRIBE [EXTENDED | FORMATTED] [db_name.]table_name [PARTITION partition_spec] [col_name];

TableSource

public interface PartitionableTableSource {

  // get partition column names.

  List<String> getPartitionFieldNames();

  // Applies the remaining partitions to the table source.

  TableSource applyPartitionPruning(List<Expression> partitionPredicates);

}

TableSink

public interface PartitionableTableSink {

  // set the static partition into the TableSink.

  void setStaticPartition(Map<String, String> partitions);

  // get dynamic partition column names.

  List<String> getDynamicPartitionFieldNames();

  // If returns true, sink can trust all records will definitely be grouped by partition fields before consumed by the sink, sink can use “grouped multi-partition writer”. If returns false, there are no need to do partition grouping.

  // If never invoke this method, that mean the execution mode(streaming mode) don’t support grouping, the sink should use its “ungrouped multi-partition writer” when there are dynamic partitions.

  boolean enableDynamicPartitionGrouping();

}

Road map

...

  • Sync flink partition and hive partition

[PARTITION(partition_spec)];

Describe

DESCRIBE [EXTENDED | FORMATTED] [db_name.]table_name [PARTITION partition_spec] [col_name];

TableSink

public interface PartitionableTableSink {

  List<String> getPartitionFieldNames();

  // set the static partition into the TableSink.

  void setStaticPartition(Map<String, String> partitions);

  // If returns true, sink can trust all records will definitely be grouped by partition fields before consumed by the sink, sink can use “grouped multi-partition writer”. If returns false, there are no need to do partition grouping.

  // If never invoke this method, that mean the execution mode(streaming mode) don’t support grouping, the sink should use its “ungrouped multi-partition writer” when there are dynamic partitions.

  boolean enableDynamicPartitionGrouping();

}

Catalog interface

public interface Catalog {

  …..

  void renamePartition(ObjectPath tablePath, CatalogPartitionSpec spec, CatalogPartitionSpec newSpec) throws PartitionNotExistException, PartitionAlreadyExistsException, CatalogException;

  void syncPartitions(ObjectPath tablePath) throws TableNotExistException, CatalogException;

 List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters) throws TableNotExistException, TableNotPartitionedException, CatalogException;

}

Road map

  1. Modify DDL support.
  2. Rework partition pruning
  3. Rework dynamic partitioning
  4. Introduce FileSystemTableFactory
    • Introduce BatchFileSystemSink
    • Introduce StreamingFileSystemSink
    • Introduce FileSystemTableFactory and FileSystemTableSource and FileSystemTableSink
    • Introduce CsvFileSystemTableFactory
    • Integrate Hive to FileSystemTableFactory


Nice to have:

  1. Integrate Create table DDL(with partition) to Hive
  2. push down partition pruning to hive metastore
  3. Introduce alter partitions commands
  4. Introduce recover partitions commands
  5. Introduce show/describe partitions commands
  6. Integrate partition statistics to planner

...

Reference

[1] https://en.wikipedia.org/wiki/Partition_(database)

...