Status
Current state: "Under discussion"
Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-107-Reading-table-columns-from-different-parts-of-source-records-td38277.html
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation:
Besides the main payload majority (if not all of the sources) expose additional information. It can be simply a read-only metadata such as offset, ingestion time or a read and write only parts of the record that contain data but additionally serve different purposes (partitioning, compaction etc.)
We should make it possible to read and write data from all of those locations.
Kafka is the source with the most intricacies as it allows storing data in multiple different places of the records. Each of those places is/can be serialized differently. Moreover some of them might serve different purposes (
- all of them can be just a data container,
- key for partitioning (hash on the key),
- key for compacting (if topic is compacted records with same key within a partition are merged),
- for log retention (timestamp)
Because of the reasons above Kafka will be used for a majority of the examples.
Examples:
- Kafka: ETL: read, transform and write back with key, value. All fields of the key are present in the value as well.
CREATE TABLE kafka_table (
|
An insert statement:
INSERT INTO kafka_table VALUES( |
will result in a ProducerRecord as follows:
new ProducerRecord<>( |
A slightly different use case is to manually control where to store the columns data:
CREATE TABLE kafka_table ( 'key.format.type' = 'csv', 'value.format.type' = 'avro', |
An insert statement:
INSERT INTO kafka_table VALUES( |
will result in a ProducerRecord as follows:
new ProducerRecord<>( |
- Kafka: Generate Kafka’s timestamp (it does not have to be a time attribute)
CREATE TABLE csv_table ( 'timestamp' = 'timestamp' |
- Access read-only metadata e.g. partition
- Kafka: represents partitions ids as longs
CREATE TABLE kafka_table ( |
- Kinesis: kinesis represents partitions keys as strings
CREATE TABLE kinesis_table ( |
- Kafka: Push down offset filter
CREATE TABLE kafka_table ( |
- Kinesis: Specify custom partitioning
CREATE TABLE kinesis_table ( |
Both records will end up in the same partition.
- Kafka: Manually specify target partition, overwrite partitioning by key.
CREATE TABLE kinses_table ( |
The two records end up in different partitions even though the resulting Kafka’s keys will be equal for both records.
Details:
Reading and writing from key, value, timestamp
I suggest defining the places where the particular columns come from in the source options sections.
I suggest introducing connector specific properties that allow specifying which fields should end up in which parts of the record. The reason why I am not suggesting having a key-value-metadata format is that there are most often differences across records of different systems, e.g.
- kinesis, pravega do not have a key where users can store data
- hbase does not have a concept of timestamp
As described previously Kafka has the most complex record structure. It is also the most important source for streaming use cases therefore I will discuss how to support its record below.
I suggest introducing additional properties for controlling different parts of ConsumerRecord:
- key.fields, key.format.type, key.format.(<properties-required-by-format>) - this controls which fields should end up in Kafka’s key and what should be the serialization format
- value.fields-include,
- value.fields-include - controls which fields should end up in the value as well, possible values
- ALL (all fields of the schema, even if they are part of e.g. the key)
- EXCEPT_KEY (all fields of the schema - fields of the key)
- EXCEPT_KEY_TIMESTAMP (all fields of the schema - fields of the key - field of timestamp)
- value.format.type, value.format.(<properties-required-by-format>) - I suggest having the “value” prefix optional, this way we could be backwards compatible with previous declarations
If a user has an old DDL statement that deserialized fields only from the value, but wants to also read data from the key.
CREATE TABLE kafka_table (...) WITH (
|
In that case a user needs to add only the description of the key:
CREATE TABLE kafka_table (...) WITH ( |
- timestamp.field - property which tells which field to store as Kafka’s timestamp
- (optional support) fields.verify-integrity - controls if we should perform an equality check if a field is contained in different parts of consumer record
Summary:
CREATE TABLE kafka_table ( --optional: (false by default) if true and a field originates from more than a single location (e.g. both in key and value), a check will be performed both values are equal |
Rejected alternatives:
Marking columns in the schema section:
CREATE TABLE kafka_table ( |
There is a couple of issues with this approach:
- mixes logical schema definition with physical representation of format and/or source (kafka the only source that has a meaningful key, all other sources like pulsar/kinesis/pravega use key only as a partitioning hash and support string based keys only). Moreover it makes it harder to reason about when implementing CREATE TABLE … AS SELECT …
For CREATE TABLE … AS SELECT … there is no place to put the modifiers, as the schema is derived from the query.
- cumbersome or impossible to handle columns that are stored in multiple places (both in key and value)
Accessing read only data
Sources often provide metadata information that can be accessed in a read only manner. Such metadata include:
- partitionId
- offset
- shardId
- ingestion time
Those kind of properties we could access using computed columns:
Option 1:
Generic SYSTEM_METADATA(“property-key”)
CREATE TABLE kafka_table ( |
How to derive type of a column
Both options have the issue where to get the type of a computed column from. In the second option partitionId between different sources might be expressed with different types (e.g. with long or String). The returned type depends on the type of source.
In option 1 the problem is even more visible, as the type will differ depending on the requested property. This could be alleviated by having a return type strategy based on a value of constant, but it would require that the function lists all properties for all sources. Moreover the problem described for option 1 remains.
Solution:
Enable declaring type for computed column. In that case the return type strategy could use that type as return type. It was part of the initial proposal of FLIP-70, but was removed from it. Reintroducing declaring result type explicitly will also benefit regular computed columns. Currently it is not possible to use functions that infer result type based on expected type.
Option 2:
Specific per property functions
CREATE TABLE kafka_table ( |
I think option 1 is more appealing as it is more future proof. It makes it easier to add new properties in the future.
Option 3:
Similar to option 1, but instead of using the computed columns we could introduce a special syntax. This would prohibit using
Writing to a specific partition/partitioning of source
A common requirement is to create a custom partitioning of the data. We should have a way to specify/compute target partition/shard for Kinesis/Pravega/Pulsar. In those cases it would be the only way to control partitioning.
In the case of Kafka we could use the key for data partitioning as described in the first section. However, it does not have to always correspond to the partition that a record originates from/is written to. The default partitioner uses the hash value of the key and the total number of partitions on a topic to determine the partition number. If you increase a partition number, then the default partitioner will return different numbers even if you provide the same key. Users might want to implement a custom Kafka’s partitioner to be able to rescale the topic.
Use PARTITIONED BY clause with different partitioning strategies (similar to ORACLE):
Partition by hash:
CREATE TABLE kafka_table ( |
If necessary we can introduce more partitioning strategies E.g. explicit partitioning:
-- partitioning based on an existing column in data |
Relation to FLIP-63
Flip-63 introduced initial support for PARTITIONED BY clause to an extent that let us support Hive's partitioning. In majority of situations it is in line with the proposal above.
The only significant difference is that this proposal suggests adding partitioning strategies e.g. PARTITIONED BY HASH/EXPLICIT. Currently implemented strategy works as an EXPLICIT strategy that requires a list of any type as a partition identifier. We could make the EXPLICIT strategy default, which could let us support current PARTITION BY syntax.
The current implementation requires the column in PARTITIONED BY clause to be also defined in the schema part of the table. It is the same as in this proposal.
FLIP-63 discusses syntax as INSERT INTO ... PARTITION(...) → It does not interfere with this proposal. It just implies additional check that the row matches the partition requirements(columns in rows match the partition specification)
FLIP-63 discusses more in-depth filesystem specific partition pruning. This proposal leaves out that part to specific source implementations. Therefore it does not contradict with FLIP-63.