THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
String schemaRegistryURL = ...; Map<String, String> kafkaProps = ...; SchemaRegistryCatalog catalog = SchemaRegistryCatalog.builder() .schemaRegistryURL(schemaRegistryURL) .kafkaOptions(kafkaProps) .catalogName("myCatalog") .dbName("myDB") .build(); tEnv.registerCatalog("catalog1myCatalog", catalog); // ---------- Consume stream from Kafka ------------------- // Assumes there is a topic named 'transactions' String query = "SELECT\n" + " id, amount\n" + "FROM myCatalog.myDB.transactions"; |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Catalog for * <a href="https://docs.confluent.io/current/schema-registry/schema_registry_tutorial.html">Confluent Schema Registry</a>. * It allows to access all the topics of current Confluent Schema Registry Service * through SQL or TableAPI, there is no need to create any table explicitly. * * <p>The code snippet below illustrates how to use this catalog: * <pre> * String schemaRegistryURL = ...; * Map<String, String> kafkaProps = ...; * SchemaRegistryCatalog catalog = SchemaRegistryCatalog.builder() * .schemaRegistryURL(schemaRegistryURL) * .kafkaOptions(kafkaProps) * .catalogName("myCatalog") * .dbName("myDB") * .build(); * tEnv.registerCatalog("catalog1myCatalog", catalog); * * // ---------- Consume stream from Kafka ------------------- * * // Assumes there is a topic named 'transactions' * String query = "SELECT\n" + * " id, amount\n" + * "FROM myCatalog.myDB.transactions"; * </pre> * * <p>We only support TopicNameStrategy for subject naming strategy, * for which all the records in one topic has the same schema, see * <a href="https://docs.confluent.io/current/schema-registry/serializer-formatter.html#how-the-naming-strategies-work">How the Naming Strategies Work</a> * for details. * * <p>You can specify some common options for these topics. All the tables from this catalog * would take the same options. If this is not your request, use dynamic table options setting up * within per-table scope. * * <p>The limitations: * <ul> * <li>The catalog only supports reading messages with the latest enabled schema for any given * Kafka topic at the time when the SQL query was compiled.</li> * <li>No time-column and watermark support.</li> * <li>The catalog is read-only. It does not support table creations * or deletions or modifications.</li> * <li>The catalog only supports Kafka message values prefixed with schema id, * this is also the default behaviour for the SchemaRegistry Kafka producer format.</li> * </ul> */ public class SchemaRegistryCatalog extends TableCatalog {} |
...