Table of Contents
Status
Current state: ["Under Discussion"]
Discussion thread:
JIRA:
...
Page properties | ||||||||
---|---|---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block | ||||
---|---|---|---|---|
| ||||
String schemaRegistryURL = ...;
Map<String, String> kafkaProps = ...;
SchemaRegistryCatalog catalog = SchemaRegistryCatalog.builder()
.schemaRegistryURL(schemaRegistryURL)
.kafkaOptions(kafkaProps)
.catalogName("myCatalog")
.dbName("myDB")
.build();
tEnv.registerCatalog("myCatalog", catalog);
// ---------- Consume stream from Kafka -------------------
// Assumes there is a topic named 'transactions'
String query = "SELECT\n" +
" id, amount\n" +
"FROM myCatalog.myDB.transactions";
|
...
The format also has its own compatibility rules, for example, for Avro: http://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution
...
Design Proposal
Note: We only support avro format for this FLIP !!!
SchemaRegistryCatalog
The SchemaRegistryCatalog interacts with the Confluent Schema Registry Service directly through its java SDK client, e.g. the SchemaRegistryClient.
- when open/initialize, it fetches all the topic list of current Kafka cluster and makes a cache in the memory.
- when a specific catalog table info is requested from the SQL context, it fetches the Avro schema string and format type of the topic's latest subject schema, then generates a catalog table instance and put it into the local cache.
We dropped the idea to have table cache because the cache would introduce in-consistency with the latest SR state.
A catalog table instance with the following attributes was cached into the memorygenerated each time #getTable is invoked:
- TableSchema: inferred from the Avro schema string
- Confluent schema registry URL
- The Avro schema string
- The schema registry subject name (actually with format {topic-name}-value, used for write)
- The Kafka topic name
- Format identifier "avro-sr", this helps to find formats for Confluent Schema Registry Se/De
- Common properties for all the topics (as a parameter of the Catalog)
...
The SchemaRegistryCatalog.Builder can be used to configure the following options:
name | isOptional | default | remark |
---|
connectorOptions | false | (null) |
Connector options for all the tables read from this catalog, the options are shared for all the tables, if you want to tweak or override per-table scope, use the dynamic table options or CREATE TABLE LIKE syntax. | |||
schemaRegistryURL | false | (null) | Schema Registry URL to connect to the registry service. |
dbName | true | kafka | database name |
schemaRegsitryClient | true | CachedSchemaRegistryClient with default identityMapCapacity of 1000 | Sets up the {@link SchemaRegistryClient} to connect to the registry service. By default, the catalog holds a {@link CachedSchemaRegistryClient} with 1000 as {@code identityMapCapacity}. This method is used for custom client configuration, i.e. the SSL configurations or to change the default {@code identityMapCapacity}. |
Basic Auth Security for Producers and Consumers
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Catalog for * <a href="https://docs.confluent.io/current/schema-registry/schema_registry_tutorial.html">Confluent Schema Registry</a>. * It allows to access all the topics of current Confluent Schema Registry Service * through SQL or TableAPI, there is no need to create any table explicitly. * * <p>The code snippet below illustrates how to use this catalog: * <pre> * String schemaRegistryURL = ...; * Map<String, String> kafkaProps = ...; * SchemaRegistryCatalog catalog = SchemaRegistryCatalog.builder() * .schemaRegistryURL(schemaRegistryURL) * .kafkaOptions(kafkaProps) * .catalogName("myCatalog") * .dbName("myDB") * .build(); * tEnv.registerCatalog("myCatalog", catalog); * * // ---------- Consume stream from Kafka ------------------- * * // Assumes there is a topic named 'transactions' * String query = "SELECT\n" + * " id, amount\n" + * "FROM myCatalog.myDB.transactions"; * </pre> * * <p>We only support TopicNameStrategy for subject naming strategy, * for which all the records in one topic has the same schema, see * <a href="https://docs.confluent.io/current/schema-registry/serializer-formatter.html#how-the-naming-strategies-work">How the Naming Strategies Work</a> * for details. * * <p>You can specify some common options for these topics. All the tables from this catalog * would take the same options. If this is not your request, use dynamic table options setting up * within per-table scope. * * <p>The limitationsbehaviors: * <ul> * <li>The catalog only supports reading messages with the latest enabled schema for any given * Kafka topic at the time when the SQL query was compiled.</li> * <li>No time-column and watermark support.</li> * <li>The catalog is read-only. It does not support table creations * or deletions or modifications.</li> * <li>The catalog only supports Kafka message values prefixed with schema id, * this is also the default behaviour for the SchemaRegistry Kafka producer format.</li> * </ul> */ public class SchemaRegistryCatalog extends TableCatalog {} |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Table format factory for providing configured instances of Schema Registry Avro to RowData * {@link SerializationSchema} and {@link DeserializationSchema}. */ public class RegistryAvroFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory { } |
The
...
Expected Behaviors
- The catalog only supports reading messages with the latest enabled schema for any given Kafka topic at the time when the SQL query was compiled
- No time-column and watermark support
- The catalog is read-only. It does not support table creations or deletions or modifications
- The catalog only supports Kafka message values prefixed with schema id, this is also the default behavior for the SchemaRegistry Kafka producer format
...