THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
StreamTableEnvironment tEnv = ... ConfluentSchemaRegistryCatalog catalog = new ConfluentSchemaRegistryCatalog( properties, schemaRegistryURL, "catalog1", "db1"); tEnv.registerCatalog("catalog1", catalog); // ---------- Consume stream from Kafka ------------------- String query = "SELECT\n" + " id, amount\n" + "FROM catalog1.db1.transactions1"; |
Code Block | ||||
---|---|---|---|---|
| ||||
/** Confluent registry Avro row deserialization schema. */
public class ConfluentRegistryAvroRowDeserializationSchema
extends AvroRowDeserializationSchema {
} |
Code Block | ||||
---|---|---|---|---|
| ||||
/** Confluent schema registry Avro row serialization schema.*/
public class ConfluentRegistryAvroRowSerializationSchema extends AvroRowSerializationSchema {
} |
Design Proposal
ConfluentSchemaRegistryCatalog
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** Format factory for confluent schema registry Avro to Row. */
public class ConfluentRegistryAvroRowFormatFactory extends TableFormatFactoryBase<Row>
implements SerializationSchemaFactory<Row>, DeserializationSchemaFactory<Row> {
} |
Code Block | ||||
---|---|---|---|---|
| ||||
/** Confluent schema registry Avro row serialization schema.*/
public class ConfluentRegistryAvroRowSerializationSchema extends AvroRowSerializationSchema {
} |
Code Block | ||||
---|---|---|---|---|
| ||||
/** Confluent registry Avro row deserialization schema. */
public class ConfluentRegistryAvroRowDeserializationSchema
extends AvroRowDeserializationSchema {
} |
Compatibility, Deprecation, and Migration Plan
...