Status
Current state: ["Under Discussion"]
Discussion thread:
JIRA:
Released: 1.11.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Confluent Schema Registry provides a RESTful interface for developers to define standard schemas for their events, share them across the organization and safely evolve them in a way that is backward compatible and future proof. Schema Registry stores a versioned history of all schemas and allows the evolution of schemas according to the configured compatibility settings. It also provides a plugin to clients that handles schema storage and retrieval for messages that are sent in Avro format.
A Confluent Schema Registry Catalog make the Flink SQL table access extremely convenient, all need to config is a single schema registry URL, then all the Kafka topics registered in the schema registry service can be accessed in Flink SQL and table API. Here is a code snippet to illustrate how to access tables by registering such a catalog:
StreamTableEnvironment tEnv = ... ConfluentSchemaRegistryCatalog catalog = new ConfluentSchemaRegistryCatalog( properties, schemaRegistryURL, "catalog1", "db1"); tEnv.registerCatalog("catalog1", catalog); // ---------- Consume stream from Kafka ------------------- String query = "SELECT\n" + " id, amount\n" + "FROM catalog1.db1.transactions1";
Design Proposal
ConfluentSchemaRegistryCatalog
The ConfluentSchemaRegistryCatalog interacts with the Confluent Schema Registry Service directly through its RESTful API, when open/initialize, it fetches all the topic list of current Kafka cluster and makes a cache in the memory. When a specific catalog table info is requested from the SQL context, it fetches the Avro schema string of the topic's latest subject. A catalog table instance with the following attributes was cached into the memory:
- TableSchema: inferred from the Avro schema string
- Confluent schema registry URL
- The Avro schema string
- The schema registry subject name (used for write)
- The Kafka topic name
- Format named "confluent-registry-avro", this helps to find formats for Confluent Schema Registry Se/De
- Common properties for all the topics (as a parameter of the Catalog)
The graph below illustrates how this catalog works with the Confluent Schema Registry Service:
ConfluentRegistryAvroRowFormatFactory
ConfluentRegistryAvroRowFormatFactory is a factory for Confluent Schema Registry Avro Se/De formats. It has following attributes:
- format.type = confluent-registry-avro: factory ID, required
- format.schema-registry.url: schema registry URL, required
- format.schema-registry.subject: subject to write to, required only for sink table
- format.avro-schema: avro schema string, required
Public Interfaces
/** * Catalog for * <a href="https://docs.confluent.io/current/schema-registry/schema_registry_tutorial.html">Confluent Schema Registry</a>. * It allows to access all the topics of current Confluent Schema Registry Service * through SQL or TableAPI, there is no need to create any table explicitly. * * <p>The code snippet below illustrates how to use this catalog: * <pre> * ConfluentSchemaRegistryCatalog catalog = new ConfluentSchemaRegistryCatalog( * properties, * schemaRegistryURL, * "catalog1", * "db1"); * tEnv.registerCatalog("catalog1", catalog); * * // ---------- Consume stream from Kafka ------------------- * * String query = "SELECT\n" + * " id, amount\n" + * "FROM catalog1.db1.transactions1"; * </pre> * * <p>We only support TopicNameStrategy for subject naming strategy, * for which all the records in one topic has the same schema, see * <a href="https://docs.confluent.io/current/schema-registry/serializer-formatter.html#how-the-naming-strategies-work">How the Naming Strategies Work</a> * for details. * * <p>You can specify some common options for these topics. All the tables from this catalog * would take the same options. If this is not your request, use dynamic table options set up * within per-table scope. */ public class ConfluentSchemaRegistryCatalog extends AbstractCatalog { }
/** Format factory for confluent schema registry Avro to Row. */ public class ConfluentRegistryAvroRowFormatFactory extends TableFormatFactoryBase<Row> implements SerializationSchemaFactory<Row>, DeserializationSchemaFactory<Row> { }
/** Confluent schema registry Avro row serialization schema.*/ public class ConfluentRegistryAvroRowSerializationSchema extends AvroRowSerializationSchema { }
/** Confluent registry Avro row deserialization schema. */ public class ConfluentRegistryAvroRowDeserializationSchema extends AvroRowDeserializationSchema { }
Compatibility, Deprecation, and Migration Plan
This is a new feature so there is no compatibility problem.
Implementation Plan
Add a new Catalog named ConfluentSchemaRegistryCatalog
Add a format factory ConfluentRegistryAvroRowFormatFactory
Add two formats: ConfluentRegistryAvroRowDeserializationSchema and ConfluentRegistryAvroRowSerializationSchema
Test Plan
The Confluent Schema Registry is a service on Kafka cluster, we need a e2e test for both read and write of Kafka topics.