Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties


Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)/thread/z1p4f38x9ohv29y4nlq8g1wdxwfjwxd1
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)/thread/cft2d9jc2lr8gv6dyyz7b62188mf07sj
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-33494

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Release<Flink Version>


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Flink's Kafka connector also needs to define the message key and number of Kafka partitions (which semantically are closer to buckets actually) using options:

Code Block
CREATE TABLE KafkaTable (
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'key.format' = 'json',
  'key.fields' = 'user_id;item_id',
  'value.format' = 'json',
  'properties.num.partitons' = '6',
)

...

Code Block
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
  (
    ...
  )
  [COMMENT table_comment]
  [
    {
      DISTRIBUTED BY [ BY { HASH | RANGE } ] (bucket_column_name1, bucket_column_name2, ...) ] [INTO n BUCKETS]
    |
      DISTRIBUTED INTO n BUCKETS
    }
  ]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (...)
  [ LIKE ... ]


ALTER TABLE [IF EXISTS] table_name {
    ADD { ... | <distribution_spec> }
  | MODIFY { ... | <distribution_spec> }
  | DROP { ... | <distribution>}
  | RENAME ...
  | RENAME TO ...
  | SET ...
  | RESET ...
}

<distribution_spec>: DISTRIBUTION
  {
    DISTRIBUTION BY [ BY{ HASH | RANGE } ] (bucket_column_name1, bucket_column_name2, ...) ] [INTO n BUCKETS]
  |
    DISTRIBUTION INTO n BUCKETS
  }

<distribution>: DISTRIBUTION

...

  • By default, DISTRIBUTED BY assumes a list of columns for an implicit hash partitioning.
  • The referenced columns must exist and must be physical columns.We can still introduce DISTRIBUTED BY HASH(uid) or other strategies like DISTRIBUTED BY RANGE(uid) in the future.
  • Specifying the number of buckets is mandatory if the DISTRIBUTED clause is declared.
  • The BY (...) syntax matches with PARTITION BY () for consistency by default.


Code Block
CREATE TABLE MyTable (uid BIGINT, name STRING)

...

  • Omitting the DISTRIBUTED clause leaves the distribution up to the connector implementation.
  • This the behavior prior to this FLIP.


Code Block
CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS

Notes:

  • For advanced users, the algorithm can be defined explicitly.
  • Currently, either HASH() or RANGE().


Code Block
CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED INTO 6 BUCKETS

...

  • Omitting the BY (...) clause leaves the distribution up to the connector implementation.
  • The connector will most likely use a round robin or random distribution.
  • Again, specifying the number of buckets is mandatory otherwise the DISTRIBUTED clause could be omitted entirely to achieve the same behavior.


Code Block
CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY RANGE(uid)

Notes:

  • Omitting the INTO n BUCKETS leaves the number of buckets up to the connector implementation.

ALTER TABLE

We propose to add the DISTRIBUTION keyword to the ALTER TABLE DDL:

...

Code Block
public interface CatalogTable extends CatalogBaseTable {
    
    /** Returns the distribution of the table if the {@code DISTRIBUTED} clause is defined. */
    Optional<TableDistribution> getDistribution();

    /** Distribution specification. */
    class TableDistribution {

        private final Kind kind;
        private final @Nullable Integer bucketCount;
        private final List<String> bucketKeys;

        privatepublic TableDistribution(
                Kind kind, @Nullable Integer bucketCount, List<String> bucketKeys) {
            this.kind = kind;
            this.bucketCount = bucketCount;
            this.bucketKeys = bucketKeys;
        }

        /** Connector-dependent distribution with a declared number of buckets. */
        public static TableDistribution ofUnknown(int bucketCount) {
            return new TableDistribution(Kind.UNKNOWN, bucketCount, Collections.emptyList());
        }
        
         /** Hash distribution over on the given keys among the declared number of buckets. */
        public static TableDistribution ofHash(intList<String> bucketCountbucketKeys, @Nullable List<String>Integer bucketKeysbucketCount) {
            return new TableDistribution(Kind.HASH, bucketCount, bucketKeys);
        } 
        
        /** Range distribution over on the given keys among the declared number of buckets. */
        public static TableDistribution ofRange(List<String> bucketKeys, @Nullable Integer bucketCount) {
            return new TableDistribution(Kind.RANGE, bucketCount, bucketKeys);
        enum} 

        enum Kind {
            UNKNOWN,
            HASH,
            RANGE
        }

        public Kind getKind() {
            return kind;
        }

        public List<String> getBucketKeys() {
            return bucketKeys;
        }

        public Optional<Integer> getBucketCount() {
            return Optional.ofNullable(bucketCount);
        }
    }
}

...

Code Block
package org.apache.flink.table.connector.sink.abilities;

public interface SupportsBucketing {

  Set<TableDistribution.Kind> listAlgorithms()

  boolean requiresBucketCount();

}

Notes:

  • Currently, this interface does not need additional methodsthe interface mostly serves for helpful error messages.
  • All information is provided by ResolvedCatalogTable.
  • The interface is checked in DynamicSinkUtils similar to SupportsPartitioning

...