Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • key.fields, key.format, key.(<format-identifier>).(<properties-required-by-format>) - this controls which fields should end up in Kafka’s key and what should be the serialization format
  • value.fields-include, this controls which fields should end up in the value as well, possible values 
    • ALL (all fields of the schema, even if they are part of e.g. the key)
    • EXCEPT_KEY (all fields of the schema - fields of the key)
  • value.format, value.(format-identifier).(<properties-required-by-format>) - The “value” prefix is optional
  • (optional support) fields.verify-integrity - controls if we should perform an equality check if a field is contained in different parts of consumer record (either in key or value)

Note: key.format, value.format, key.fields etc. are only available in Kafka. They are Kafka connector specific properties.

A full example looks like:

...

Use PARTITIONED BY clause with different partitioning strategies (similar to ORACLE):

Partition by hash:

CREATE TABLE kafka_table (
  id STRING,
  name STRING,
  date: DATE,
  key: BINARY
  ...
) WITH (
  'connector.type' = 'kafka'
  'key.fields' = 'key' --the key will not be used for partitioning
)
PARTITIONED BY HASH(id, name)


If necessary we can introduce more partitioning strategies E.g. explicit partitioning:

-- partitioning based on an existing column in data
CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  date: DATE,
  key: BINARY
  ...
) WITH (
  'connector.type' = 'kafka'
  'key.fields' = 'key' -- the key will not be used for partitioning
)
PARTITIONED BY EXPLICIT(id)

-- or on a computed column
CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  targetPartition BIGINT AS CAST(name AS BIGINT),
  date: DATE,
  key: BINARY
  ...
) WITH (
  'connector.type' = 'kafka'
  'key.fields' = 'key' -- the key will not be used for partitioning
)
PARTITIONED BY EXPLICIT(targetPartition)

-- Or with user defined functions in the partitioned by clause
CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  date: DATE,
  key: BINARY
  ...
) WITH (
  'connector.type' = 'kafka'
  'key.fields' = 'key' -- the key will not be used for partitioning
)
PARTITIONED BY EXPLICIT(USER_SCALAR_FUNCTION(name, date, SYSTEM_METADATA("numberOfPartitions"))

Relation to FLIP-63

Flip-63 introduced initial support for PARTITIONED BY clause to an extent that let us support Hive's partitioning. In majority of situations it is in line with the proposal above.

...