Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The graph below illustrates how this catalog works with the Confluent Schema Registry Service:

SchemaRegistryAvroFormatFactory

ConfluentRegistryAvroRowFormatFactory is a factory for Confluent Schema Registry Avro Se/De formats. It has following attributes:

...

Parameters

The SchemaRegistryCatalog.Builder can be used to configure there options:

nameisOptionaldefaultremark
kafkaOptionsfalse(null)Kafka connector options for all the tables read from this catalog, the options are shared for all the tables, if you want to tweak or override per-table scope, use the dynamic table options or CREATE TABLE LIKE syntax.
schemaRegistryURLfalse(null)Schema Registry URL to connect to the registry service.
catalogNametruekafkacatalog name
dbNametruekafkadatabase name
schemaRegsitryClienttrueCachedSchemaRegistryClient with default identityMapCapacity of 1000 Sets up the {@link SchemaRegistryClient} to connect to the registry service.
By default, the catalog holds a {@link CachedSchemaRegistryClient} with 1000
as {@code identityMapCapacity}.

This method is used for custom client configuration, i.e. the SSL configurations
or to change the default {@code identityMapCapacity}.

...

Basic Auth Security for Producers and Consumers

See https://docs.confluent.io/current/schema-registry/serdes-develop/index.html#basic-auth-security-for-producers-and-consumers.

If you want to configure the client to enable SSL, use a custom SchemaRegistryClient when constructing the catalog.

RegistryAvroFormatFactory

RegistryAvroFormatFactory is a factory for Confluent Schema Registry Avro Se/De formats. It has following attributes:

  • format: avro-sr: factory ID, required
  • schema-registry.url: schema registry URL, required
  • schema-string: avro schema string, required
  • schema-registry.subject: subject to write to, required only for sink table

Public Interfaces

Code Block
languagejava
titleConfluentSchemaRegistryCatalog
/**
 * Catalog for
 * <a href="https://docs.confluent.io/current/schema-registry/schema_registry_tutorial.html">Confluent Schema Registry</a>.
 * It allows to access all the topics of current Confluent Schema Registry Service
 * through SQL or TableAPI, there is no need to create any table explicitly.
 *
 * <p>The code snippet below illustrates how to use this catalog:
 * <pre>
 *      String schemaRegistryURL = ...;
 * 		Map<String, String> kafkaProps = ...;
 * 		SchemaRegistryCatalog catalog = SchemaRegistryCatalog.builder()
 * 				.schemaRegistryURL(schemaRegistryURL)
 * 				.kafkaOptions(kafkaProps)
 * 				.catalogName("myCatalog")
 * 				.dbName("myDB")
 * 				.build();
 * 		tEnv.registerCatalog("catalog1", catalog);
 *
 * 		// ---------- Consume stream from Kafka -------------------
 *
 * 		// Assumes there is a topic named 'transactions'
 * 		String query = "SELECT\n" +
 * 			"  id, amount\n" +
 * 			"FROM myCatalog.myDB.transactions";
 * </pre>
 *
 * <p>We only support TopicNameStrategy for subject naming strategy,
 * for which all the records in one topic has the same schema, see
 * <a href="https://docs.confluent.io/current/schema-registry/serializer-formatter.html#how-the-naming-strategies-work">How the Naming Strategies Work</a>
 * for details.
 *
 * <p>You can specify some common options for these topics. All the tables from this catalog
 * would take the same options. If this is not your request, use dynamic table options setting up
 * within per-table scope.
 *
 * <p>The limitations:
 * <ul>
 *     <li>The catalog only supports reading messages with the latest enabled schema for any given
 *     Kafka topic at the time when the SQL query was compiled.</li>
 *     <li>No time-column and watermark support.</li>
 *     <li>The catalog is read-only. It does not support table creations
 *     or deletions or modifications.</li>
 *     <li>The catalog only supports Kafka message values prefixed with schema id,
 *     this is also the default behaviour for the SchemaRegistry Kafka producer format.</li>
 * </ul>
 */
public class SchemaRegistryCatalog extends TableCatalog {}

...