You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 11 Next »

Status

Current state["Under Discussion"]

Discussion thread

JIRA

Released: 1.12.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Confluent Schema Registry provides a RESTful interface for developers to define standard schemas for their events, share them across the organization and safely evolve them in a way that is backward compatible and future proof. Schema Registry stores a versioned history of all schemas and allows the evolution of schemas according to the configured compatibility settings. It also provides a plugin to clients that handles schema storage and retrieval for messages that are sent in Avro format.

A Confluent Schema Registry Catalog make the Flink SQL table access extremely convenient, all need to config is a single schema registry URL, then all the Kafka topics registered in the schema registry service can be accessed in Flink SQL and table API. Here is a code snippet to illustrate how to access tables by registering such a catalog:

		StreamTableEnvironment tEnv = ...

		ConfluentSchemaRegistryCatalog catalog = new ConfluentSchemaRegistryCatalog(
			properties,
			schemaRegistryURL,
			"catalog1",
			"db1");
		tEnv.registerCatalog("catalog1", catalog);

		// ---------- Consume stream from Kafka -------------------

		String query = "SELECT\n" +
			"  id, amount\n" +
			"FROM catalog1.db1.transactions1";

Design Proposal

ConfluentSchemaRegistryCatalog

The ConfluentSchemaRegistryCatalog interacts with the Confluent Schema Registry Service directly through its RESTful API, when open/initialize, it fetches all the topic list of current Kafka cluster and makes a cache in the memory. When a specific catalog table info is requested from the SQL context, it fetches the Avro schema string of the topic's latest subject. A catalog table instance with the following attributes was cached into the memory:

  • TableSchema: inferred from the Avro schema string
  • Confluent schema registry URL
  • The Avro schema string
  • The schema registry subject name (used for write)
  • The Kafka topic name
  • Format named "confluent-registry-avro", this helps to find formats for Confluent Schema Registry Se/De
  • Common properties for all the topics (as a parameter of the Catalog)

The graph below illustrates how this catalog works with the Confluent Schema Registry Service:

ConfluentRegistryAvroRowFormatFactory

ConfluentRegistryAvroRowFormatFactory is a factory for Confluent Schema Registry Avro Se/De formats. It has following attributes:

  • format.type = confluent-registry-avro: factory ID, required
  • format.schema-registry.url: schema registry URL, required
  • format.schema-registry.subject: subject to write to, required only for sink table
  • format.avro-schema: avro schema string, required

Basic Auth Security for Producers and Consumers

See https://docs.confluent.io/current/schema-registry/serdes-develop/index.html#basic-auth-security-for-producers-and-consumers.

Public Interfaces

ConfluentSchemaRegistryCatalog
/**
 * Catalog for
 * <a href="https://docs.confluent.io/current/schema-registry/schema_registry_tutorial.html">Confluent Schema Registry</a>.
 * It allows to access all the topics of current Confluent Schema Registry Service
 * through SQL or TableAPI, there is no need to create any table explicitly.
 *
 * <p>The code snippet below illustrates how to use this catalog:
 * <pre>
 *     ConfluentSchemaRegistryCatalog catalog = new ConfluentSchemaRegistryCatalog(
 * 			properties,
 * 			schemaRegistryURL,
 * 			"catalog1",
 * 			"db1");
 * 		tEnv.registerCatalog("catalog1", catalog);
 *
 * 		// ---------- Consume stream from Kafka -------------------
 *
 * 		String query = "SELECT\n" +
 * 			"  id, amount\n" +
 * 			"FROM catalog1.db1.transactions1";
 * </pre>
 *
 * <p>We only support TopicNameStrategy for subject naming strategy,
 * for which all the records in one topic has the same schema, see
 * <a href="https://docs.confluent.io/current/schema-registry/serializer-formatter.html#how-the-naming-strategies-work">How the Naming Strategies Work</a>
 * for details.
 *
 * <p>You can specify some common options for these topics. All the tables from this catalog
 * would take the same options. If this is not your request, use dynamic table options set up
 * within per-table scope.
 */
public class ConfluentSchemaRegistryCatalog extends AbstractCatalog {
}


ConfluentRegistryAvroRowFormatFactory
/** Format factory for confluent schema registry Avro to Row. */
public class ConfluentRegistryAvroRowFormatFactory extends TableFormatFactoryBase<Row>
		implements SerializationSchemaFactory<Row>, DeserializationSchemaFactory<Row> {
}


ConfluentRegistryAvroRowSerializationSchema
/** Confluent schema registry Avro row serialization schema.*/
public class ConfluentRegistryAvroRowSerializationSchema extends AvroRowSerializationSchema {
}
ConfluentRegistryAvroRowDeserializationSchema
/** Confluent registry Avro row deserialization schema. */
public class ConfluentRegistryAvroRowDeserializationSchema
		extends AvroRowDeserializationSchema {
}

Compatibility, Deprecation, and Migration Plan

This is a new feature so there is no compatibility problem.

Implementation Plan

  • Add a new Catalog named ConfluentSchemaRegistryCatalog

  • Add a format factory ConfluentRegistryAvroRowFormatFactory

  • Add two formats: ConfluentRegistryAvroRowDeserializationSchema and ConfluentRegistryAvroRowSerializationSchema

Test Plan

The Confluent Schema Registry is a service on Kafka cluster, we need a e2e test for both read and write of Kafka topics.


  • No labels