...
Code Block | ||||
---|---|---|---|---|
| ||||
/** Confluent registry Avro row deserialization schema. */ public class ConfluentRegistryAvroRowDeserializationSchema extends AvroRowDeserializationSchema { } |
The Table Schema and Watermark Definition
Table that reads from the ConfluentSchemaRegistryCatalog only has fields of the value part of the Kafka record which is with Avro format, for example, an Avro schema string
Code Block | ||
---|---|---|
| ||
{"namespace": "io.confluent.examples.clients.basicavro",
"type": "record",
"name": "Payment",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double"}
]
} |
would yield Table schema <id: STRING, amount: DOUBLE>, there is no fields that comes from the record key part and no water mark strategy definition.
The Watermark Definition
Base on the FLIP-110, user can use the LIKE clause to append a watermark definition to the table reading from the Catalog, for example:
Code Block | ||
---|---|---|
| ||
CREATE [TEMPORARY] TABLE derived_table (
WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND
)
LIKE base_table; -- base_table comes from the ConfluentSchemaRegistryCatalog |
The Key Fields as Part of the Schema
Base on the FLIP-110 and FLIP-107, user can use the LIKE clause to append key columns to the table reading from the Catalog, for example:
Code Block | ||
---|---|---|
| ||
CREATE [TEMPORARY] TABLE derived_table (
id BIGINT,
name STRING,
) WITH (
'key.fields' = 'id, name',
'key.format.type' = 'csv'
)
LIKE base_table; -- base_table comes from the ConfluentSchemaRegistryCatalog |
Compatibility, Deprecation, and Migration Plan
This is a new feature so there is no compatibility problem.
Implementation Plan
Add a new Catalog named ConfluentSchemaRegistryCatalog
Add a format factory ConfluentRegistryAvroRowFormatFactory
Add two formats: ConfluentRegistryAvroRowDeserializationSchema and ConfluentRegistryAvroRowSerializationSchema
...