Status
Current state: ["Under Discussion"]
Discussion thread:
JIRA:
Released: 1.11.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Confluent Schema Registry provides a RESTful interface for developers to define standard schemas for their events, share them across the organization and safely evolve them in a way that is backward compatible and future proof. Schema Registry stores a versioned history of all schemas and allows the evolution of schemas according to the configured compatibility settings. It also provides a plugin to clients that handles schema storage and retrieval for messages that are sent in Avro format.
A Confluent Schema Registry Catalog make the Flink SQL table access extremely convenient, all need to config is a single schema registry URL, then all the Kafka topics registered in the schema registry service can be accessed in Flink SQL and table API. Here is a code snippet to illustrate how to access tables by registering such a catalog:
StreamTableEnvironment tEnv = ... ConfluentSchemaRegistryCatalog catalog = new ConfluentSchemaRegistryCatalog( properties, schemaRegistryURL, "catalog1", "db1"); tEnv.registerCatalog("catalog1", catalog); // ---------- Consume stream from Kafka ------------------- String query = "SELECT\n" + " id, amount\n" + "FROM catalog1.db1.transactions1";
Design Proposal
ConfluentSchemaRegistryCatalog
The ConfluentSchemaRegistryCatalog interacts with the Confluent Schema Registry Service directly through its RESTful API, when open/initialize, it fetches all the topic list of current Kafka cluster and makes a cache in the memory. When a specific catalog table info is requested from the SQL context, it fetches the Avro schema string of the topic's latest subject. A catalog table instance with the following attributes was cached into the memory:
- TableSchema: inferred from the Avro schema string
- Confluent schema registry URL
- The Avro schema string
- The schema registry subject name (used for write)
- The Kafka topic name
- Format named "confluent-registry-avro", this helps to find formats for Confluent Schema Registry Se/De
- Common properties for all the topics (as a parameter of the Catalog)
The graph below illustrates how this catalog works with the Confluent Schema Registry Service:
ConfluentRegistryAvroRowFormatFactory
ConfluentRegistryAvroRowFormatFactory is a factory for Confluent Schema Registry Avro Se/De formats. It has following attributes:
- format.type = confluent-registry-avro: factory ID, required
- format.schema-registry.url: schema registry URL, required
- format.schema-registry.subject: subject to write to, required only for sink table
- format.avro-schema: avro schema string, required
Public Interfaces
Compatibility, Deprecation, and Migration Plan
This is a new feature so there is no compatibility problem.
Implementation Plan
Add a new Catalog named ConfluentSchemaRegistryCatalog
Add a format factory ConfluentRegistryAvroRowFormatFactory
Add two formats: ConfluentRegistryAvroRowDeserializationSchema and ConfluentRegistryAvroRowSerializationSchema
Test Plan
The Confluent Schema Registry is a service on Kafka cluster, we need a e2e test for both read and write of Kafka topics.