...
The graph below illustrates how this catalog works with the Confluent Schema Registry Service:
SchemaRegistryAvroFormatFactory
ConfluentRegistryAvroRowFormatFactory is a factory for Confluent Schema Registry Avro Se/De formats. It has following attributes:
...
Parameters
The SchemaRegistryCatalog.Builder can be used to configure there options:
name | isOptional | default | remark |
---|---|---|---|
kafkaOptions | false | (null) | Kafka connector options for all the tables read from this catalog, the options are shared for all the tables, if you want to tweak or override per-table scope, use the dynamic table options or CREATE TABLE LIKE syntax. |
schemaRegistryURL | false | (null) | Schema Registry URL to connect to the registry service. |
catalogName | true | kafka | catalog name |
dbName | true | kafka | database name |
schemaRegsitryClient | true | CachedSchemaRegistryClient with default identityMapCapacity of 1000 | Sets up the {@link SchemaRegistryClient} to connect to the registry service. By default, the catalog holds a {@link CachedSchemaRegistryClient} with 1000 as {@code identityMapCapacity}. This method is used for custom client configuration, i.e. the SSL configurations or to change the default {@code identityMapCapacity}. |
...
Basic Auth Security for Producers and Consumers
If you want to configure the client to enable SSL, use a custom SchemaRegistryClient when constructing the catalog.
RegistryAvroFormatFactory
RegistryAvroFormatFactory is a factory for Confluent Schema Registry Avro Se/De formats. It has following attributes:
- format: avro-sr: factory ID, required
- schema-registry.url: schema registry URL, required
- schema-string: avro schema string, required
- schema-registry.subject: subject to write to, required only for sink table
Public Interfaces
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Catalog for * <a href="https://docs.confluent.io/current/schema-registry/schema_registry_tutorial.html">Confluent Schema Registry</a>. * It allows to access all the topics of current Confluent Schema Registry Service * through SQL or TableAPI, there is no need to create any table explicitly. * * <p>The code snippet below illustrates how to use this catalog: * <pre> * String schemaRegistryURL = ...; * Map<String, String> kafkaProps = ...; * SchemaRegistryCatalog catalog = SchemaRegistryCatalog.builder() * .schemaRegistryURL(schemaRegistryURL) * .kafkaOptions(kafkaProps) * .catalogName("myCatalog") * .dbName("myDB") * .build(); * tEnv.registerCatalog("catalog1", catalog); * * // ---------- Consume stream from Kafka ------------------- * * // Assumes there is a topic named 'transactions' * String query = "SELECT\n" + * " id, amount\n" + * "FROM myCatalog.myDB.transactions"; * </pre> * * <p>We only support TopicNameStrategy for subject naming strategy, * for which all the records in one topic has the same schema, see * <a href="https://docs.confluent.io/current/schema-registry/serializer-formatter.html#how-the-naming-strategies-work">How the Naming Strategies Work</a> * for details. * * <p>You can specify some common options for these topics. All the tables from this catalog * would take the same options. If this is not your request, use dynamic table options setting up * within per-table scope. * * <p>The limitations: * <ul> * <li>The catalog only supports reading messages with the latest enabled schema for any given * Kafka topic at the time when the SQL query was compiled.</li> * <li>No time-column and watermark support.</li> * <li>The catalog is read-only. It does not support table creations * or deletions or modifications.</li> * <li>The catalog only supports Kafka message values prefixed with schema id, * this is also the default behaviour for the SchemaRegistry Kafka producer format.</li> * </ul> */ public class SchemaRegistryCatalog extends TableCatalog {} |
...