THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name ( ... ) [COMMENT table_comment] [ { DISTRIBUTED BY [ BY { HASH | RANGE } ] (bucket_column_name1, bucket_column_name2, ...) ] [INTO n BUCKETS] | DISTRIBUTED INTO n BUCKETS } ] [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] WITH (...) [ LIKE ... ] ALTER TABLE [IF EXISTS] table_name { ADD { ... | <distribution_spec> } | MODIFY { ... | <distribution_spec> } | DROP { ... | <distribution>} | RENAME ... | RENAME TO ... | SET ... | RESET ... } <distribution_spec>: DISTRIBUTION [ BY { DISTRIBUTION BY [ { HASH | RANGE } ] (bucket_column_name1, bucket_column_name2, ...) ] [INTO n BUCKETS] | DISTRIBUTION INTO n BUCKETS } <distribution>: DISTRIBUTION |
...
- By default,
DISTRIBUTED BY
assumes a list of columns for an implicit hash partitioning. - The referenced columns must exist and must be physical columns.
- We can still introduce
DISTRIBUTED BY HASH(uid)
or other strategies likeDISTRIBUTED BY RANGE(uid)
in the future. - Specifying the number of buckets is mandatory if the
DISTRIBUTED
clause is declared. - The
BY (...)
syntax matches withPARTITION BY ()
for consistency by default.
Code Block |
---|
CREATE TABLE MyTable (uid BIGINT, name STRING) |
...
- Omitting the
DISTRIBUTED
clause leaves the distribution up to the connector implementation. - This the behavior prior to this FLIP.
Code Block |
---|
CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS |
Notes:
- For advanced users, the algorithm can be defined explicitly.
- Currently, either
HASH()
orRANGE()
.
Code Block |
---|
CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED INTO 6 BUCKETS |
...
- Omitting the
BY (...)
clause leaves the distribution up to the connector implementation. - The connector will most likely use a round robin or random distribution.
- Again, specifying the number of buckets is mandatory otherwise the
DISTRIBUTED
clause could be omitted entirely to achieve the same behavior.
Code Block |
---|
CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY RANGE(uid) |
Notes:
- Omitting the
INTO n BUCKETS
leaves the number of buckets up to the connector implementation.
ALTER TABLE
We propose to add the DISTRIBUTION
keyword to the ALTER TABLE
DDL:
...
Code Block |
---|
public interface CatalogTable extends CatalogBaseTable { /** Returns the distribution of the table if the {@code DISTRIBUTED} clause is defined. */ Optional<TableDistribution> getDistribution(); /** Distribution specification. */ class TableDistribution { private final Kind kind; private final @Nullable Integer bucketCount; private final List<String> bucketKeys; privatepublic TableDistribution( Kind kind, @Nullable Integer bucketCount, List<String> bucketKeys) { this.kind = kind; this.bucketCount = bucketCount; this.bucketKeys = bucketKeys; } /** Connector-dependent distribution with a declared number of buckets. */ public static TableDistribution ofUnknown(int bucketCount) { return new TableDistribution(Kind.UNKNOWN, bucketCount, Collections.emptyList()); } /** Hash distribution over on the given keys among the declared number of buckets. */ public static TableDistribution ofHash(intList<String> bucketCountbucketKeys, @Nullable List<String>Integer bucketKeysbucketCount) { return new TableDistribution(Kind.HASH, bucketCount, bucketKeys); } /** Range distribution over on the given keys among the declared number of buckets. */ public static TableDistribution ofRange(List<String> bucketKeys, @Nullable Integer bucketCount) { return new TableDistribution(Kind.RANGE, bucketCount, bucketKeys); } enum enum Kind { UNKNOWN, HASH, RANGE } public Kind getKind() { return kind; } public List<String> getBucketKeys() { return bucketKeys; } public Optional<Integer> getBucketCount() { return Optional.ofNullable(bucketCount); } } } |
...
Code Block |
---|
package org.apache.flink.table.connector.sink.abilities;
public interface SupportsBucketing {
Set<TableDistribution.Kind> listAlgorithms()
boolean requiresBucketCount();
} |
Notes:
- Currently, this interface does not need additional methodsthe interface mostly serves for helpful error messages.
- All information is provided by
ResolvedCatalogTable
. - The interface is checked in
DynamicSinkUtils
similar toSupportsPartitioning
...