Discussion threadhttps://lists.apache.org/thread/z1p4f38x9ohv29y4nlq8g1wdxwfjwxd1
Vote threadhttps://lists.apache.org/thread/cft2d9jc2lr8gv6dyyz7b62188mf07sj
JIRA

Unable to render Jira issues macro, execution error.

Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Many SQL vendors expose the concepts of Partitioning, Bucketing, and Clustering.

Terminology

Scope

FLIP-63: Rework table partition support introduced the concept of Partitioning to Flink.

This FLIP proposes to introduce the concept of Bucketing to Flink.

Clustering is out of scope for this FLIP.

Requirements

Apache Paimon (ex Flink Table Store) currently needs to define the number of buckets and bucket keys in the WITH clause:

CREATE TABLE PaimonTable (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING,
    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh) WITH (
    'bucket' = '2',
    'bucket-key' = 'user_id'
);

Flink's Kafka connector also needs to define the message key and number of Kafka partitions (which semantically are closer to buckets actually) using options:

CREATE TABLE KafkaTable (
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'key.format' = 'json',
  'key.fields' = 'user_id;item_id',
  'value.format' = 'json',
  'properties.num.partitons' = '6',
)


→ Flink should offer a native way of declaring bucketing.

→ Whether this is supported or not during runtime should then be a connector characteristic - similar to partitioning.

→ The planner can leverage this property via ability interfaces in the future. However, this is out of scope for this FLIP. This FLIP focuses solely on the syntax and necessary API changes.

Public Interfaces

SQL syntax

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
  (
    ...
  )
  [COMMENT table_comment]
  [
    {
      DISTRIBUTED BY [ { HASH | RANGE } ] (bucket_column_name1, bucket_column_name2, ...) ] [INTO n BUCKETS]
    |
      DISTRIBUTED INTO n BUCKETS
    }
  ]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (...)
  [ LIKE ... ]


ALTER TABLE [IF EXISTS] table_name {
    ADD { ... | <distribution_spec> }
  | MODIFY { ... | <distribution_spec> }
  | DROP { ... | <distribution>}
  | RENAME ...
  | RENAME TO ...
  | SET ...
  | RESET ...
}

<distribution_spec>:
  {
    DISTRIBUTION BY [ { HASH | RANGE } ] (bucket_column_name1, bucket_column_name2, ...) ] [INTO n BUCKETS]
  |
    DISTRIBUTION INTO n BUCKETS
  }

<distribution>: DISTRIBUTION

API Changes

  • CatalogTable.getDistribution()

  • CatalogTable.Builder and CatalogTable.newBuilder() instead of another overloaded CatalogTable.of() method.

  • org.apache.flink.table.connector.sink.abilities.SupportsBucketing

Proposed Changes

CREATE TABLE

We propose to add the DISTRIBUTED keyword to the CREATE TABLE DDL:

CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY (uid) INTO 6 BUCKETS

Notes

  • By default, DISTRIBUTED BY assumes a list of columns for an implicit hash partitioning.
  • The referenced columns must exist and must be physical columns.
  • The BY (...) syntax matches with PARTITION BY () for consistency by default.


CREATE TABLE MyTable (uid BIGINT, name STRING)

Notes:

  • Omitting the DISTRIBUTED clause leaves the distribution up to the connector implementation.
  • This the behavior prior to this FLIP.


CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS

Notes:

  • For advanced users, the algorithm can be defined explicitly.
  • Currently, either HASH() or RANGE().


CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED INTO 6 BUCKETS

Notes:

  • Omitting the BY (...) clause leaves the distribution up to the connector implementation.
  • The connector will most likely use a round robin or random distribution.
  • Again, specifying the number of buckets is mandatory otherwise the DISTRIBUTED clause could be omitted entirely to achieve the same behavior.


CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY RANGE(uid)

Notes:

  • Omitting the INTO n BUCKETS leaves the number of buckets up to the connector implementation.

ALTER TABLE

We propose to add the DISTRIBUTION keyword to the ALTER TABLE DDL:

ALTER TABLE MyTable ADD DISTRIBUTION BY (uid) INTO 6 BUCKETS;
ALTER TABLE MyTable ADD DISTRIBUTION INTO 6 BUCKETS;
ALTER TABLE MyTable MODIFY DISTRIBUTION BY (uid) INTO 6 BUCKETS;
ALTER TABLE MyTable MODIFY DISTRIBUTION INTO 6 BUCKETS;

Notes:

  • Similar syntax as in CREATE TABLE but with noun instead of verb similar how we do it with PARTITIONED BY and PARTITION
  • Consistency with existing ALTER TABLE components is ensured.


ALTER TABLE MyTable DROP DISTRIBUTION;

Notes:

  • Similar to PRIMARY KEY or WATERMARK without spec

CatalogTable

In order to store the new metadata, we add new methods to CatalogTable.


public interface CatalogTable extends CatalogBaseTable {
    
    /** Returns the distribution of the table if the {@code DISTRIBUTED} clause is defined. */
    Optional<TableDistribution> getDistribution();

    /** Distribution specification. */
    class TableDistribution {

        private final Kind kind;
        private final @Nullable Integer bucketCount;
        private final List<String> bucketKeys;

        public TableDistribution(
                Kind kind, @Nullable Integer bucketCount, List<String> bucketKeys) {
            this.kind = kind;
            this.bucketCount = bucketCount;
            this.bucketKeys = bucketKeys;
        }

        /** Connector-dependent distribution with a declared number of buckets. */
        public static TableDistribution ofUnknown(int bucketCount) {
            return new TableDistribution(Kind.UNKNOWN, bucketCount, Collections.emptyList());
        }
        
        /** Hash distribution over on the given keys among the declared number of buckets. */
        public static TableDistribution ofHash(List<String> bucketKeys, @Nullable Integer bucketCount) {
            return new TableDistribution(Kind.HASH, bucketCount, bucketKeys);
        } 
        
        /** Range distribution over on the given keys among the declared number of buckets. */
        public static TableDistribution ofRange(List<String> bucketKeys, @Nullable Integer bucketCount) {
            return new TableDistribution(Kind.RANGE, bucketCount, bucketKeys);
        } 

        enum Kind {
            UNKNOWN,
            HASH,
            RANGE
        }

        public Kind getKind() {
            return kind;
        }

        public List<String> getBucketKeys() {
            return bucketKeys;
        }

        public Optional<Integer> getBucketCount() {
            return Optional.ofNullable(bucketCount);
        }
    }
}

Notes:

  • Transitively, this also applies to ResolvedCatalogTable.
  • The new attribute also requires an overloaded CatalogTable.of() method, this is a good chance to introduce CatalogTable.Builder and CatalogTable.newBuilder(). The builder design can be similar to Schema.

SupportsBucketing

For checking whether bucketing is supported, we introduce a new marker interface:

package org.apache.flink.table.connector.sink.abilities;

public interface SupportsBucketing {

  Set<TableDistribution.Kind> listAlgorithms()

  boolean requiresBucketCount();

}

Notes:

  • Currently, the interface mostly serves for helpful error messages.
  • All information is provided by ResolvedCatalogTable.
  • The interface is checked in DynamicSinkUtils similar to SupportsPartitioning

For CompiledPlan, we add:

@JsonTypeName("Bucketing")
BucketingSpec implements SinkAbilitySpec {
  // no properties; only check whether interface is implemented again during deserialization
}


Kafka Connector

The DISTRIBUTED clause can replace two options in the Kafka connector:

  • key.fields → not required anymore
  • sink.partitioner → both 'default' and 'round-robin' are covered

For backward compatibility, we suggest to keep the options but make them optional and give the DISTRIBUTED declaration precedence in KafkaTableFactory.

It is not allowed to specify both. Either a user goes with the option or with the clause. We won't implement merging logic.

The KafkaDynamicSink should implement SupportsBucketing.

The bucket number is translated to the Kafka property: properties.num.partitions


Compatibility, Deprecation, and Migration Plan

No migration required.

Test Plan

We add tests to all layers. CompiledPlan tests, parser tests, catalog tests.

Rejected Alternatives

None.