Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Design Proposal

ConfluentSchemaRegistryCatalog

Schema Compatibility

From 5.5.0, Schema Registry allows different  compatibility level per-subject or globally, the Schema Registry service would force compatibility check when the schema evolves.

Schema & Format

Schema and Format Binding

From 5.5.0, user can get format for a schema through confluent SchemaRegistryClient#getLatestSchemaMetadata, i.e. Avro, Json or Protobuf.

Format Compatibility

The format also has its own compatibility rules, for example, for Avro: http://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution

Design Proposal

SchemaRegistryCatalog

The SchemaRegistryCatalog The ConfluentSchemaRegistryCatalog interacts with the Confluent Schema Registry Service directly through its RESTful API, java SDK client, e.g. the SchemaRegistryClient.

  • when open/initialize, it fetches all the topic list of current Kafka cluster and makes a cache in the memory.

...

  • when a specific catalog table info is requested from the SQL context, it fetches the Avro schema string and format type of the topic's latest subject schema, then generates a catalog table instance and put it into the local cache.


A catalog table instance with the following attributes was cached into the memory:

  • TableSchema: inferred from the Avro schema string
  • Confluent schema registry URL
  • The Avro schema string
  • The schema registry subject name (actually with format {topic-name}-value, used for write)
  • The Kafka topic name
  • Format named identifier "confluentavro-registry-avrosr", this helps to find formats for Confluent Schema Registry Se/De
  • Common properties for all the topics (as a parameter of the Catalog)

The graph below illustrates how this catalog works with the Confluent Schema Registry Service:

...

SchemaRegistryAvroFormatFactory

ConfluentRegistryAvroRowFormatFactory is a factory for Confluent Schema Registry Avro Se/De formats. It has following attributes:

...

Code Block
languagejava
titleConfluentSchemaRegistryCatalog
/**
 * Catalog for
 * <a href="https://docs.confluent.io/current/schema-registry/schema_registry_tutorial.html">Confluent Schema Registry</a>.
 * It allows to access all the topics of current Confluent Schema Registry Service
 * through SQL or TableAPI, there is no need to create any table explicitly.
 *
 * <p>The code snippet below illustrates how to use this catalog:
 * <pre>
 *      String schemaRegistryURL = ...;
  ConfluentSchemaRegistryCatalog* 		Map<String, String> kafkaProps = ...;
 * 		SchemaRegistryCatalog catalog = new ConfluentSchemaRegistryCatalog(SchemaRegistryCatalog.builder()
 * 			properties,	.schemaRegistryURL(schemaRegistryURL)
 * 				schemaRegistryURL,.kafkaOptions(kafkaProps)
 * 				.catalogName("catalog1myCatalog",)
 * 				.dbName("db1myDB")
 * 				.build();
 * 		tEnv.registerCatalog("catalog1", catalog);
 *
 * 		// ---------- Consume stream from Kafka -------------------
 *
 * 		// Assumes there is a topic named 'transactions'
 * 		String query = "SELECT\n" +
 * 			"  id, amount\n" +
 * 			"FROM catalog1myCatalog.db1myDB.transactions1transactions";
 * </pre>
 *
 * <p>We only support TopicNameStrategy for subject naming strategy,
 * for which all the records in one topic has the same schema, see
 * <a href="https://docs.confluent.io/current/schema-registry/serializer-formatter.html#how-the-naming-strategies-work">How the Naming Strategies Work</a>
 * for details.
 *
 * <p>You can specify some common options for these topics. All the tables from this catalog
 * would take the same options. If this is not your request, use dynamic table options setsetting up
 * within per-table scope.
 */
public class* ConfluentSchemaRegistryCatalog<p>The extendslimitations:
 AbstractCatalog* {
}
Code Block
languagejava
titleConfluentRegistryAvroRowFormatFactory
/** Format factory for confluent schema registry Avro to Row. */
public class ConfluentRegistryAvroRowFormatFactory extends TableFormatFactoryBase<Row>
		implements SerializationSchemaFactory<Row>, DeserializationSchemaFactory<Row> {
}
Code Block
languagejava
titleConfluentRegistryAvroRowSerializationSchema
/** Confluent schema registry Avro row serialization schema.<ul>
 *     <li>The catalog only supports reading messages with the latest enabled schema for any given
 *     Kafka topic at the time when the SQL query was compiled.</li>
 *     <li>No time-column and watermark support.</li>
 *     <li>The catalog is read-only. It does not support table creations
 *     or deletions or modifications.</li>
 *     <li>The catalog only supports Kafka message values prefixed with schema id,
 *     this is also the default behaviour for the SchemaRegistry Kafka producer format.</li>
 * </ul>
 */
public class ConfluentRegistryAvroRowSerializationSchemaSchemaRegistryCatalog extends AvroRowSerializationSchemaTableCatalog {
}


Code Block
languagejava
titleConfluentRegistryAvroRowDeserializationSchemaConfluentRegistryAvroRowFormatFactory
/**
 * Confluent registry Avro row deserialization schema.Table format factory for providing configured instances of Schema Registry Avro to RowData
 * {@link SerializationSchema} and {@link DeserializationSchema}.
 */
public class ConfluentRegistryAvroRowDeserializationSchemaRegistryAvroFormatFactory implements
		DeserializationFormatFactory,
		extends AvroRowDeserializationSchemaSerializationFormatFactory {
}

The Limitations

  • The catalog only supports reading messages with the latest enabled schema for any given Kafka topic at the time when the SQL query was compiled
  • No time-column and watermark support
  • The catalog is read-only. It does not support table creations or deletions or modifications
  • The catalog only supports Kafka message values prefixed with schema id, this is also the default behavior for the SchemaRegistry Kafka producer format

The Table Schema and Watermark Definition

...