THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
- format.type = confluent-registry-avro: factory ID, required
- format.schema-registry.url: schema registry URL, required
- format.schema-registry.subject: subject to write to, required only for sink table
- format.avro-schema: avro schema string, required
Public Interfaces
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Catalog for
* <a href="https://docs.confluent.io/current/schema-registry/schema_registry_tutorial.html">Confluent Schema Registry</a>.
* It allows to access all the topics of current Confluent Schema Registry Service
* through SQL or TableAPI, there is no need to create any table explicitly.
*
* <p>The code snippet below illustrates how to use this catalog:
* <pre>
* ConfluentSchemaRegistryCatalog catalog = new ConfluentSchemaRegistryCatalog(
* properties,
* schemaRegistryURL,
* "catalog1",
* "db1");
* tEnv.registerCatalog("catalog1", catalog);
*
* // ---------- Consume stream from Kafka -------------------
*
* String query = "SELECT\n" +
* " id, amount\n" +
* "FROM catalog1.db1.transactions1";
* </pre>
*
* <p>We only support TopicNameStrategy for subject naming strategy,
* for which all the records in one topic has the same schema, see
* <a href="https://docs.confluent.io/current/schema-registry/serializer-formatter.html#how-the-naming-strategies-work">How the Naming Strategies Work</a>
* for details.
*
* <p>You can specify some common options for these topics. All the tables from this catalog
* would take the same options. If this is not your request, use dynamic table options set up
* within per-table scope.
*/
public class ConfluentSchemaRegistryCatalog extends AbstractCatalog {
} |
Code Block | ||||
---|---|---|---|---|
| ||||
/** Format factory for confluent schema registry Avro to Row. */
public class ConfluentRegistryAvroRowFormatFactory extends TableFormatFactoryBase<Row>
implements SerializationSchemaFactory<Row>, DeserializationSchemaFactory<Row> {
} |
Compatibility, Deprecation, and Migration Plan
This is a new feature so there is no compatibility problem.
Implementation Plan
Add a new Catalog named ConfluentSchemaRegistryCatalog
Add a format factory ConfluentRegistryAvroRowFormatFactory
Add two formats: ConfluentRegistryAvroRowDeserializationSchema and ConfluentRegistryAvroRowSerializationSchema
Test Plan
The Confluent Schema Registry is a service on Kafka cluster, we need a e2e test for both read and write of Kafka topics.
...