Table of Contents
Status
Current state: ["Under Discussion"]
Discussion thread:
JIRA:
Page properties | ||||||||
---|---|---|---|---|---|---|---|---|
|
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block | ||||
---|---|---|---|---|
| ||||
StreamTableEnvironment tEnv String schemaRegistryURL = ...; ConfluentSchemaRegistryCatalog Map<String, String> kafkaProps = ...; SchemaRegistryCatalog catalog = new ConfluentSchemaRegistryCatalog( properties, schemaRegistryURL, "catalog1", "db1"); SchemaRegistryCatalog.builder() .schemaRegistryURL(schemaRegistryURL) .kafkaOptions(kafkaProps) .dbName("myDB") .build(); tEnv.registerCatalog("catalog1myCatalog", catalog); // ---------- Consume stream from Kafka ------------------- // Assumes there is a topic named 'transactions' String query = "SELECT\n" + " id, amount\n" + "FROM catalog1myCatalog.db1myDB.transactions1"; |
Design Proposal
ConfluentSchemaRegistryCatalog
transactions";
|
Introduction to Confluent Schema Registry
Terminology
What is a topic versus a schema versus a subject?
- Subject: Schema Registry defines a scope in which schemas can evolve, and that scope is the subject. The name of the subject depends on the configured subject name strategy, which by default is set to derive subject name from topic name
- Topic:A Kafka topic contains messages, and each message is a key-value pair. Either the message key or the message value, or both, can be serialized as Avro, JSON, or Protobuf
- Schema name:Schema name, for Avro it is the record name, for Json, it is the title name
See terminology-review for details.
Subject Naming Strategy
There are 3 kinds of naming strategy for current 5.5.1 version:
TopicNameStrategy
: <topic name>-key | <topic name>-valueRecordNameStrategy
: <fully-qualified record name>-key | <fully-qualified record name>-valueTopicRecordNameStrategy
: <topic name>-<fully-qualified record name>-key | <topic name>-<fully-qualified record name>-value
The RecordNameStrategy
allows different schemas(record type) within one Kafka topic,the schema compatibility check for same record name are among all the topics,while TopicRecordNameStrategy
checks the compatibility of same record name schema within one Kafka topic.
See sr-schemas-subject-name-strategy for details.
The Restful API
See schemaregistry-api for details.
security
- https://docs.confluent.io/current/security/basic-auth.html#schema-registry
- https://docs.confluent.io/current/schema-registry/security/index.html#sr-security-overview
- https://docs.confluent.io/current/schema-registry/serdes-develop/index.html#basic-auth-security-for-producers-and-consumers
Schema Compatibility
From 5.5.0, Schema Registry allows different compatibility level per-subject or globally, the Schema Registry service would force compatibility check when the schema evolves.
- https://docs.confluent.io/current/schema-registry/avro.html#compatibility-types
- http://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution
- https://docs.confluent.io/current/schema-registry/serdes-develop/serdes-json.html#json-schema-compatibility-rules
- https://docs.confluent.io/current/schema-registry/develop/api.html#id1
Schema & Format
Schema and Format Binding
From 5.5.0, user can get format for a schema through confluent SchemaRegistryClient#getLatestSchemaMetadata
,
i.e. Avro, Json or Protobuf.
Format Compatibility
The format also has its own compatibility rules, for example, for Avro: http://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution
Design Proposal
Note: We only support avro format for this FLIP !!!
SchemaRegistryCatalog
The SchemaRegistryCatalog The ConfluentSchemaRegistryCatalog interacts with the Confluent Schema Registry Service directly through its RESTful API, java SDK client, e.g. the SchemaRegistryClient.
- when open/initialize, it fetches all the topic list of current Kafka cluster and makes a cache in the memory.
...
- when a specific catalog table info is requested from the SQL context, it fetches the Avro schema string and format type of the topic's latest subject schema, then generates a catalog table instance and put it into the local cache.
We dropped the idea to have table cache because the cache would introduce in-consistency with the latest SR state.
A catalog table instance with the following attributes was cached into the memorygenerated each time #getTable is invoked:
- TableSchema: inferred from the Avro schema string
- Confluent schema registry URL
- The Avro schema string
- The schema registry subject name (actually with format {topic-name}-value, used for write)
- The Kafka topic name
- Format named identifier "confluentavro-registry-avrosr", this helps to find formats for Confluent Schema Registry Se/De
- Common properties for all the topics (as a parameter of the Catalog)
The graph below illustrates how this catalog works with the Confluent Schema Registry Service:
ConfluentRegistryAvroRowFormatFactory
Parameters
The SchemaRegistryCatalog.Builder can be used to configure the following options:
name | isOptional | default | remark |
---|---|---|---|
connectorOptions | false | (null) | Connector options for all the tables read from this catalog, the options are shared for all the tables, if you want to tweak or override per-table scope, use the dynamic table options or CREATE TABLE LIKE syntax. |
schemaRegistryURL | false | (null) | Schema Registry URL to connect to the registry service. |
dbName | true | kafka | database name |
schemaRegsitryClient | true | CachedSchemaRegistryClient with default identityMapCapacity of 1000 | Sets up the {@link SchemaRegistryClient} to connect to the registry service. By default, the catalog holds a {@link CachedSchemaRegistryClient} with 1000 as {@code identityMapCapacity}. This method is used for custom client configuration, i.e. the SSL configurations or to change the default {@code identityMapCapacity}. |
Basic Auth Security for Producers and Consumers
If you want to configure the client to enable SSL, use a custom SchemaRegistryClient when constructing the catalog.
RegistryAvroFormatFactory
RegistryAvroFormatFactory ConfluentRegistryAvroRowFormatFactory is a factory for Confluent Schema Registry Avro Se/De formats. It has following attributes:
- format.type = confluent-registry-avro: avro-sr: factory ID, required
- format.schema-registry.url: schema registry URL, required
- schema-string: avro schema string, required
- format.schema-registry.subject: subject to write to, required only for sink tableformat.avro-schema: avro schema string, required
Public Interfaces
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Catalog for * <a href="https://docs.confluent.io/current/schema-registry/schema_registry_tutorial.html">Confluent Schema Registry</a>. * It allows to access all the topics of current Confluent Schema Registry Service * through SQL or TableAPI, there is no need to create any table explicitly. * * <p>The code snippet below illustrates how to use this catalog: * <pre> * String schemaRegistryURL = ...; * Map<String, String> kafkaProps = ...; * ConfluentSchemaRegistryCatalog SchemaRegistryCatalog catalog = new ConfluentSchemaRegistryCatalog(SchemaRegistryCatalog.builder() * properties,.schemaRegistryURL(schemaRegistryURL) * schemaRegistryURL, .kafkaOptions(kafkaProps) * .catalogName("catalog1myCatalog",) * .dbName("db1"myDB") * .build(); * tEnv.registerCatalog("catalog1myCatalog", catalog); * * // ---------- Consume stream from Kafka ------------------- * * // Assumes there is a topic named 'transactions' * String query = "SELECT\n" + * " id, amount\n" + * "FROM catalog1myCatalog.db1myDB.transactions1transactions"; * </pre> * * <p>We only support TopicNameStrategy for subject naming strategy, * for which all the records in one topic has the same schema, see * <a href="https://docs.confluent.io/current/schema-registry/serializer-formatter.html#how-the-naming-strategies-work">How the Naming Strategies Work</a> * for details. * * <p>You can specify some common options for these topics. All the tables from this catalog * would take the same options. If this is not your request, use dynamic table options setsetting up * within per-table scope. * * <p>The behaviors: * <ul> * <li>The catalog only supports reading messages with the latest enabled schema for any given * Kafka topic at the time when the SQL query was compiled.</li> * <li>No time-column and watermark support.</li> * <li>The catalog is read-only. It does not support table creations * or deletions or modifications.</li> * <li>The catalog only supports Kafka message values prefixed with schema id, * this is also the default behaviour for the SchemaRegistry Kafka producer format.</li> * </ul> */ public class ConfluentSchemaRegistryCatalogSchemaRegistryCatalog extends AbstractCatalogTableCatalog { } |
Code Block | ||||
---|---|---|---|---|
| ||||
/** * FormatTable format factory for confluent schema registry providing configured instances of Schema Registry Avro to Row. RowData * {@link SerializationSchema} and {@link DeserializationSchema}. */ public class ConfluentRegistryAvroRowFormatFactoryRegistryAvroFormatFactory extends TableFormatFactoryBase<Row>implements implements SerializationSchemaFactory<Row>, DeserializationSchemaFactory<Row> { }DeserializationFormatFactory, SerializationFormatFactory { } |
The Expected Behaviors
- The catalog only supports reading messages with the latest enabled schema for any given Kafka topic at the time when the SQL query was compiled
- No time-column and watermark support
- The catalog is read-only. It does not support table creations or deletions or modifications
- The catalog only supports Kafka message values prefixed with schema id, this is also the default behavior for the SchemaRegistry Kafka producer format
The Table Schema and Watermark Definition
Table that reads from the ConfluentSchemaRegistryCatalog only has fields of the value part of the Kafka record which is with Avro format, for example, an Avro schema string
Code Block | |||||
---|---|---|---|---|---|
| |||||
{"namespace": "io.confluent.examples.clients.basicavro",
"type": "record",
"name": "Payment",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double"}
]
} |
would yield Table schema <id: STRING, amount: DOUBLE>, there is no fields that comes from the record key part and no water mark strategy definition.
The Watermark Definition
Base on the FLIP-110, user can use the LIKE clause to append a watermark definition to the table reading from the Catalog, for example:
Code Block | ||
---|---|---|
| ||
CREATE [TEMPORARY] TABLE derived_table (
WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND
)
LIKE base_table; -- base_table comes from the ConfluentSchemaRegistryCatalog |
The Key Fields as Part of the Schema
Base on the FLIP-110 and FLIP-107, user can use the LIKE clause to append key columns to the table reading from the Catalog, for example:
Code Block | ||||
---|---|---|---|---|
| ||||
CREATE [TEMPORARY] TABLE derived_table (
id BIGINT,
name STRING,
) WITH (
'key.fields' = 'id, name',
'key.format.type' = 'csv'
)
LIKE base_table; -- base_table comes from the ConfluentSchemaRegistryCatalog | ||||
/** Confluent schema registry Avro row serialization schema.*/
public class ConfluentRegistryAvroRowSerializationSchema extends AvroRowSerializationSchema {
} | ||||
Code Block | ||||
| ||||
/** Confluent registry Avro row deserialization schema. */
public class ConfluentRegistryAvroRowDeserializationSchema
extends AvroRowDeserializationSchema {
} |
Compatibility, Deprecation, and Migration Plan
This is a new feature so there is no compatibility problem.
Implementation Plan
Add a new Catalog named ConfluentSchemaRegistryCatalog
Add a format factory ConfluentRegistryAvroRowFormatFactory
Add two formats: ConfluentRegistryAvroRowDeserializationSchema and ConfluentRegistryAvroRowSerializationSchema
...