...
- https://docs.confluent.io/current/security/basic-auth.html#schema-registry
- https://docs.confluent.io/current/schema-registry/security/index.html#sr-security-overview
- https://docs.confluent.io/current/schema-registry/serdes-develop/index.html#basic-auth-security-for-producers-and-consumers
Design Proposal
ConfluentSchemaRegistryCatalog
Schema Compatibility
From 5.5.0, Schema Registry allows different compatibility level per-subject or globally, the Schema Registry service would force compatibility check when the schema evolves.
- https://docs.confluent.io/current/schema-registry/avro.html#compatibility-types
- http://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution
- https://docs.confluent.io/current/schema-registry/serdes-develop/serdes-json.html#json-schema-compatibility-rules
- https://docs.confluent.io/current/schema-registry/develop/api.html#id1
Schema & Format
Schema and Format Binding
From 5.5.0, user can get format for a schema through confluent SchemaRegistryClient#getLatestSchemaMetadata
,
i.e. Avro, Json or Protobuf.
Format Compatibility
The format also has its own compatibility rules, for example, for Avro: http://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution
Design Proposal
SchemaRegistryCatalog
The SchemaRegistryCatalog The ConfluentSchemaRegistryCatalog interacts with the Confluent Schema Registry Service directly through its RESTful API, java SDK client, e.g. the SchemaRegistryClient.
- when open/initialize, it fetches all the topic list of current Kafka cluster and makes a cache in the memory.
...
- when a specific catalog table info is requested from the SQL context, it fetches the Avro schema string and format type of the topic's latest subject schema, then generates a catalog table instance and put it into the local cache.
A catalog table instance with the following attributes was cached into the memory:
- TableSchema: inferred from the Avro schema string
- Confluent schema registry URL
- The Avro schema string
- The schema registry subject name (actually with format {topic-name}-value, used for write)
- The Kafka topic name
- Format named identifier "confluentavro-registry-avrosr", this helps to find formats for Confluent Schema Registry Se/De
- Common properties for all the topics (as a parameter of the Catalog)
The graph below illustrates how this catalog works with the Confluent Schema Registry Service:
...
SchemaRegistryAvroFormatFactory
ConfluentRegistryAvroRowFormatFactory is a factory for Confluent Schema Registry Avro Se/De formats. It has following attributes:
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Catalog for * <a href="https://docs.confluent.io/current/schema-registry/schema_registry_tutorial.html">Confluent Schema Registry</a>. * It allows to access all the topics of current Confluent Schema Registry Service * through SQL or TableAPI, there is no need to create any table explicitly. * * <p>The code snippet below illustrates how to use this catalog: * <pre> * String schemaRegistryURL = ...; ConfluentSchemaRegistryCatalog* Map<String, String> kafkaProps = ...; * SchemaRegistryCatalog catalog = new ConfluentSchemaRegistryCatalog(SchemaRegistryCatalog.builder() * properties, .schemaRegistryURL(schemaRegistryURL) * schemaRegistryURL,.kafkaOptions(kafkaProps) * .catalogName("catalog1myCatalog",) * .dbName("db1myDB") * .build(); * tEnv.registerCatalog("catalog1", catalog); * * // ---------- Consume stream from Kafka ------------------- * * // Assumes there is a topic named 'transactions' * String query = "SELECT\n" + * " id, amount\n" + * "FROM catalog1myCatalog.db1myDB.transactions1transactions"; * </pre> * * <p>We only support TopicNameStrategy for subject naming strategy, * for which all the records in one topic has the same schema, see * <a href="https://docs.confluent.io/current/schema-registry/serializer-formatter.html#how-the-naming-strategies-work">How the Naming Strategies Work</a> * for details. * * <p>You can specify some common options for these topics. All the tables from this catalog * would take the same options. If this is not your request, use dynamic table options setsetting up * within per-table scope. */ public class* ConfluentSchemaRegistryCatalog<p>The extendslimitations: AbstractCatalog* { } |
Code Block | ||||
---|---|---|---|---|
| ||||
/** Format factory for confluent schema registry Avro to Row. */
public class ConfluentRegistryAvroRowFormatFactory extends TableFormatFactoryBase<Row>
implements SerializationSchemaFactory<Row>, DeserializationSchemaFactory<Row> {
} |
Code Block | ||||
---|---|---|---|---|
| ||||
/** Confluent schema registry Avro row serialization schema.<ul> * <li>The catalog only supports reading messages with the latest enabled schema for any given * Kafka topic at the time when the SQL query was compiled.</li> * <li>No time-column and watermark support.</li> * <li>The catalog is read-only. It does not support table creations * or deletions or modifications.</li> * <li>The catalog only supports Kafka message values prefixed with schema id, * this is also the default behaviour for the SchemaRegistry Kafka producer format.</li> * </ul> */ public class ConfluentRegistryAvroRowSerializationSchemaSchemaRegistryCatalog extends AvroRowSerializationSchemaTableCatalog { } |
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Confluent registry Avro row deserialization schema.Table format factory for providing configured instances of Schema Registry Avro to RowData * {@link SerializationSchema} and {@link DeserializationSchema}. */ public class ConfluentRegistryAvroRowDeserializationSchemaRegistryAvroFormatFactory implements DeserializationFormatFactory, extends AvroRowDeserializationSchemaSerializationFormatFactory { } |
The Limitations
- The catalog only supports reading messages with the latest enabled schema for any given Kafka topic at the time when the SQL query was compiled
- No time-column and watermark support
- The catalog is read-only. It does not support table creations or deletions or modifications
- The catalog only supports Kafka message values prefixed with schema id, this is also the default behavior for the SchemaRegistry Kafka producer format
The Table Schema and Watermark Definition
...