Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
  (
    ...
  )
  [COMMENT table_comment]
  [
    {
      DISTRIBUTED BY [ BY { HASH | RANGE } ] (bucket_column_name1, bucket_column_name2, ...) ] [INTO n BUCKETS]
    |
      DISTRIBUTED INTO n BUCKETS
    }
  ]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (...)
  [ LIKE ... ]


ALTER TABLE [IF EXISTS] table_name {
    ADD { ... | <distribution_spec> }
  | MODIFY { ... | <distribution_spec> }
  | DROP { ... | <distribution>}
  | RENAME ...
  | RENAME TO ...
  | SET ...
  | RESET ...
}

<distribution_spec>: DISTRIBUTION [ BY
  {
    DISTRIBUTION BY [ { HASH | RANGE } ] (bucket_column_name1, bucket_column_name2, ...) ] [INTO n BUCKETS]
  |
    DISTRIBUTION INTO n BUCKETS
  }

<distribution>: DISTRIBUTION

...

  • By default, DISTRIBUTED BY assumes a list of columns for an implicit hash partitioning.
  • The referenced columns must exist and must be physical columns.
  • We can still introduce DISTRIBUTED BY HASH(uid) or other strategies like DISTRIBUTED BY RANGE(uid) in the future.
  • Specifying the number of buckets is mandatory if the DISTRIBUTED clause is declared.
  • The BY (...) syntax matches with PARTITION BY () for consistency by default.


Code Block
CREATE TABLE MyTable (uid BIGINT, name STRING)

...

  • Omitting the DISTRIBUTED clause leaves the distribution up to the connector implementation.
  • This the behavior prior to this FLIP.


Code Block
CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS

Notes:

  • For advanced users, the algorithm can be defined explicitly.
  • Currently, either HASH() or RANGE().


Code Block
CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED INTO 6 BUCKETS

...

  • Omitting the BY (...) clause leaves the distribution up to the connector implementation.
  • The connector will most likely use a round robin or random distribution.
  • Again, specifying the number of buckets is mandatory otherwise the DISTRIBUTED clause could be omitted entirely to achieve the same behavior.


Code Block
CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY RANGE(uid)

Notes:

  • Omitting the INTO n BUCKETS leaves the number of buckets up to the connector implementation.

ALTER TABLE

We propose to add the DISTRIBUTION keyword to the ALTER TABLE DDL:

...

Code Block
public interface CatalogTable extends CatalogBaseTable {
    
    /** Returns the distribution of the table if the {@code DISTRIBUTED} clause is defined. */
    Optional<TableDistribution> getDistribution();

    /** Distribution specification. */
    class TableDistribution {

        private final Kind kind;
        private final @Nullable Integer bucketCount;
        private final List<String> bucketKeys;

        privatepublic TableDistribution(
                Kind kind, @Nullable Integer bucketCount, List<String> bucketKeys) {
            this.kind = kind;
            this.bucketCount = bucketCount;
            this.bucketKeys = bucketKeys;
        }

        /** Connector-dependent distribution with a declared number of buckets. */
        public static TableDistribution ofUnknown(int bucketCount) {
            return new TableDistribution(Kind.UNKNOWN, bucketCount, Collections.emptyList());
        }
        
         /** Hash distribution over on the given keys among the declared number of buckets. */
        public static TableDistribution ofHash(intList<String> bucketCountbucketKeys, @Nullable List<String>Integer bucketKeysbucketCount) {
            return new TableDistribution(Kind.HASH, bucketCount, bucketKeys);
        } 
        
        /** Range distribution over on the given keys among the declared number of buckets. */
        public static TableDistribution ofRange(List<String> bucketKeys, @Nullable Integer bucketCount) {
            return new TableDistribution(Kind.RANGE, bucketCount, bucketKeys);
        } 

       enum enum Kind {
            UNKNOWN,
            HASH,
            RANGE
        }

        public Kind getKind() {
            return kind;
        }

        public List<String> getBucketKeys() {
            return bucketKeys;
        }

        public Optional<Integer> getBucketCount() {
            return Optional.ofNullable(bucketCount);
        }
    }
}

...

Code Block
package org.apache.flink.table.connector.sink.abilities;

public interface SupportsBucketing {

  Set<TableDistribution.Kind> listAlgorithms()

  boolean requiresBucketCount();

}

Notes:

  • Currently, this interface does not need additional methodsthe interface mostly serves for helpful error messages.
  • All information is provided by ResolvedCatalogTable.
  • The interface is checked in DynamicSinkUtils similar to SupportsPartitioning

...