Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleConfluentRegistryAvroRowDeserializationSchema
/** Confluent registry Avro row deserialization schema. */
public class ConfluentRegistryAvroRowDeserializationSchema
		extends AvroRowDeserializationSchema {
}

The Table Schema and Watermark Definition

Table that reads from the ConfluentSchemaRegistryCatalog only has fields of the value part of the Kafka record which is with Avro format, for example, an Avro schema string

Code Block
languagexml
{"namespace": "io.confluent.examples.clients.basicavro",
 "type": "record",
 "name": "Payment",
 "fields": [
     {"name": "id", "type": "string"},
     {"name": "amount", "type": "double"}
 ]
}


would yield Table schema <id: STRING, amount: DOUBLE>, there is no fields that comes from the record key part and no water mark strategy definition.

The Watermark Definition

Base on the FLIP-110, user can use the LIKE clause to append a watermark definition to the table reading from the Catalog, for example:

Code Block
languagesql
CREATE [TEMPORARY] TABLE derived_table (
    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND
)
LIKE base_table; -- base_table comes from the ConfluentSchemaRegistryCatalog

The Key Fields as Part of the Schema

Base on the FLIP-110 and FLIP-107, user can use the LIKE clause to append key columns to the table reading from the Catalog, for example:

Code Block
languagesql
CREATE [TEMPORARY] TABLE derived_table (
  id BIGINT,
  name STRING,
) WITH (
  'key.fields' = 'id, name',
  'key.format.type' = 'csv'
)
LIKE base_table; -- base_table comes from the ConfluentSchemaRegistryCatalog

Compatibility, Deprecation, and Migration Plan

This is a new feature so there is no compatibility problem.

Implementation Plan

  • Add a new Catalog named ConfluentSchemaRegistryCatalog

  • Add a format factory ConfluentRegistryAvroRowFormatFactory

  • Add two formats: ConfluentRegistryAvroRowDeserializationSchema and ConfluentRegistryAvroRowSerializationSchema

...