THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Page properties | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Flink's Kafka connector also needs to define the message key and number of Kafka partitions (which semantically are closer to buckets actually) using options:
Code Block |
---|
CREATE TABLE KafkaTable ( `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp', `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING ) WITH ( 'connector' = 'kafka', 'key.format' = 'json', 'key.fields' = 'user_id;item_id', 'value.format' = 'json', 'properties.num.partitons' = '6', ) |
...
Code Block |
---|
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name ( ... ) [COMMENT table_comment] [ { DISTRIBUTED BY [ BY { HASH | RANGE } ] (bucket_column_name1, bucket_column_name2, ...) ] [INTO n BUCKETS] | DISTRIBUTED INTO n BUCKETS } ] [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] WITH (...) [ LIKE ... ] ALTER TABLE [IF EXISTS] table_name { ADD { ... | <distribution_spec> } | MODIFY { ... | <distribution_spec> } | DROP { ... | <distribution>} | RENAME ... | RENAME TO ... | SET ... | RESET ... } <distribution_spec>: DISTRIBUTION [ BY { DISTRIBUTION BY [ { HASH | RANGE } ] (bucket_column_name1, bucket_column_name2, ...) ] [INTO n BUCKETS] | DISTRIBUTION INTO n BUCKETS } <distribution>: DISTRIBUTION |
...
- By default,
DISTRIBUTED BY
assumes a list of columns for an implicit hash partitioning. - The referenced columns must exist and must be physical columns.We can still introduce
DISTRIBUTED BY HASH(uid)
or other strategies likeDISTRIBUTED BY RANGE(uid)
in the future. - Specifying the number of buckets is mandatory if the
DISTRIBUTED
clause is declared. - The
BY (...)
syntax matches withPARTITION BY ()
for consistency by default.
Code Block |
---|
CREATE TABLE MyTable (uid BIGINT, name STRING) |
...
Code Block |
---|
CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS |
Notes:
- For advanced users, the algorithm can be defined explicitly.
- Currently, either
HASH()
orRANGE()
.
Code Block |
---|
CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED INTO 6 BUCKETS |
Notes:
- Omitting the
BY (...)
clause leaves the distribution up to the connector implementation. - The connector will most likely use a round robin or random distribution.
- Again, specifying the number of buckets is mandatory otherwise the
DISTRIBUTED
clause could be omitted entirely to achieve the same behavior.
Code Block |
---|
CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY RANGE(uid) |
Notes:
- Omitting the
INTO n BUCKETS
leaves the number of buckets up to the connector implementation.
ALTER TABLE
We propose to add the DISTRIBUTION
keyword to the ALTER TABLE
DDL:
...
Code Block |
---|
public interface CatalogTable extends CatalogBaseTable { /** Returns the distribution of the table if the {@code DISTRIBUTED} clause is defined. */ Optional<TableDistribution> getDistribution(); /** Distribution specification. */ class TableDistribution { private final Kind kind; private final @Nullable Integer bucketCount; private final List<String> bucketKeys; privatepublic TableDistribution( Kind kind, @Nullable Integer bucketCount, List<String> bucketKeys) { this.kind = kind; this.bucketCount = bucketCount; this.bucketKeys = bucketKeys; } /** Connector-dependent distribution with a declared number of buckets. */ public static TableDistribution ofUnknown(int bucketCount) { return new TableDistribution(Kind.UNKNOWN, bucketCount, Collections.emptyList()); } /** Hash distribution over on the given keys among the declared number of buckets. */ public static TableDistribution ofHash(intList<String> bucketCountbucketKeys, @Nullable List<String>Integer bucketKeysbucketCount) { return new TableDistribution(Kind.HASH, bucketCount, bucketKeys); } /** Range distribution over on the given keys among the declared number of buckets. */ public static TableDistribution ofRange(List<String> bucketKeys, @Nullable Integer bucketCount) { return new TableDistribution(Kind.RANGE, bucketCount, bucketKeys); enum} enum Kind { UNKNOWN, HASH, RANGE } public Kind getKind() { return kind; } public List<String> getBucketKeys() { return bucketKeys; } public Optional<Integer> getBucketCount() { return Optional.ofNullable(bucketCount); } } } |
...
Code Block |
---|
package org.apache.flink.table.connector.sink.abilities;
public interface SupportsBucketing {
Set<TableDistribution.Kind> listAlgorithms()
boolean requiresBucketCount();
} |
Notes:
- Currently, this interface does not need additional methodsthe interface mostly serves for helpful error messages.
- All information is provided by
ResolvedCatalogTable
. - The interface is checked in
DynamicSinkUtils
similar toSupportsPartitioning
...