Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
linenumberstrue
        String schemaRegistryURL = ...;
        Map<String, String> kafkaProps = ...;
        SchemaRegistryCatalog catalog = SchemaRegistryCatalog.builder()
                .schemaRegistryURL(schemaRegistryURL)
                .kafkaOptions(kafkaProps)
                .catalogName("myCatalog")
                .dbName("myDB")
                .build();
        tEnv.registerCatalog("catalog1myCatalog", catalog);

        // ---------- Consume stream from Kafka -------------------

        // Assumes there is a topic named 'transactions'
        String query = "SELECT\n" +
            " id, amount\n" +
            "FROM myCatalog.myDB.transactions";

...

Code Block
languagejava
titleConfluentSchemaRegistryCatalog
/**
 * Catalog for
 * <a href="https://docs.confluent.io/current/schema-registry/schema_registry_tutorial.html">Confluent Schema Registry</a>.
 * It allows to access all the topics of current Confluent Schema Registry Service
 * through SQL or TableAPI, there is no need to create any table explicitly.
 *
 * <p>The code snippet below illustrates how to use this catalog:
 * <pre>
 *      String schemaRegistryURL = ...;
 * 		Map<String, String> kafkaProps = ...;
 * 		SchemaRegistryCatalog catalog = SchemaRegistryCatalog.builder()
 * 				.schemaRegistryURL(schemaRegistryURL)
 * 				.kafkaOptions(kafkaProps)
 * 				.catalogName("myCatalog")
 * 				.dbName("myDB")
 * 				.build();
 * 		tEnv.registerCatalog("catalog1myCatalog", catalog);
 *
 * 		// ---------- Consume stream from Kafka -------------------
 *
 * 		// Assumes there is a topic named 'transactions'
 * 		String query = "SELECT\n" +
 * 			"  id, amount\n" +
 * 			"FROM myCatalog.myDB.transactions";
 * </pre>
 *
 * <p>We only support TopicNameStrategy for subject naming strategy,
 * for which all the records in one topic has the same schema, see
 * <a href="https://docs.confluent.io/current/schema-registry/serializer-formatter.html#how-the-naming-strategies-work">How the Naming Strategies Work</a>
 * for details.
 *
 * <p>You can specify some common options for these topics. All the tables from this catalog
 * would take the same options. If this is not your request, use dynamic table options setting up
 * within per-table scope.
 *
 * <p>The limitations:
 * <ul>
 *     <li>The catalog only supports reading messages with the latest enabled schema for any given
 *     Kafka topic at the time when the SQL query was compiled.</li>
 *     <li>No time-column and watermark support.</li>
 *     <li>The catalog is read-only. It does not support table creations
 *     or deletions or modifications.</li>
 *     <li>The catalog only supports Kafka message values prefixed with schema id,
 *     this is also the default behaviour for the SchemaRegistry Kafka producer format.</li>
 * </ul>
 */
public class SchemaRegistryCatalog extends TableCatalog {}

...