Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: "Under discussion"

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation:


Besides the main payload majority (if not all of the sources) expose additional information. It can be simply a read-only metadata such as offset, ingestion time or a read and write only parts of the record that contain data but additionally serve different purposes (partitioning, compaction etc.)

...

Because of the reasons above Kafka will be used for a majority of the examples.

Examples:


  1. Kafka: ETL: read, transform and write back with key, value. All fields of the key are present in the value as well.

...

The two records end up in different partitions even though the resulting Kafka’s keys will be equal for both records.

Details:

Reading and writing from key, value, timestamp

I suggest defining the places where the particular columns come from in the source options sections. 

...

CREATE TABLE kafka_table (
    id STRING,
    timestamp TIMESTAMP,
    col1 ...,
    col2 ...
    ....
) WITH (
  'connector.type' = 'kafka',
  ...
  'key.fields' = 'id, col1'
  'key.format.type' = 'csv'

  -- optional: controls which fields of the schema should end up in the value (ALL by default) (options: ALL, EXCEPT_KEY, EXCEPT_KEY_TIMESTAMP)
  'value.fields-include' = 'ALL'  
  'value.format.type' = 'avro'

  --optional: (false by default) if true and a field originates from more than a single location (e.g. both in key and value), a check will be performed both values are equal
  'fields.verify-integrity' = 'false'      

  'timestamp.field' = 'timestamp'
)


Rejected alternatives:

Marking columns in the schema section:

...

  • cumbersome or impossible to handle columns that are stored in multiple places (both in key and value)


Accessing read only data

Sources often provide metadata information that can be accessed in a read only manner. Such metadata include:

...

Those kind of properties we could access using computed columns:

Option 1:

Generic SYSTEM_METADATA(“property-key”)

CREATE TABLE kafka_table (
  id STRING,
  offset BIGINT AS SYSTEM_METADATA("offset"),
  partitionId BIGINT AS SYSTEM_METADATA("partition")
  ...
) WITH (
  'connector.type' = 'kafka'
)

How to derive type of a column

Both options have the issue where to get the type of a computed column from. In the second option partitionId between different sources might be expressed with different types (e.g. with long or String). The returned type depends on the type of source.

In option 1 the problem is even more visible, as the type will differ depending on the requested property. This could be alleviated by having a return type strategy based on a value of constant, but it would require that the function lists all properties for all sources. Moreover the problem described for option 1 remains.

Solution:

Enable declaring type for computed column. In that case the return type strategy could use that type as return type. It was part of the initial proposal of FLIP-70, but was removed from it. Reintroducing declaring result type explicitly will also benefit regular computed columns. Currently it is not possible to use functions that infer result type based on expected type.

Option 2:

Specific per property functions

...

I think option 1 is more appealing as it is more future proof. It makes it easier to add new properties in the future.

Option 3:

Similar to option 1, but instead of using the computed columns we could introduce a special syntax. This would prohibit using 

Writing to a specific partition/partitioning of source

A common requirement is to create a custom partitioning of the data. We should have a way to specify/compute target partition/shard for Kinesis/Pravega/Pulsar. In those cases it would be the only way to control partitioning.

...