...
To incrementally ingest the changelogs from the debezium connector, we propose to implement a few classes.DebeziumAvroSource.java
implements the source class that reads the kafka change log events. We reuse KafkaOffsetGen.java
that helps reading events from Kafka incrementally. The DebeziumAvroSource.java
pulls the latest schema from the schema registry, applies the schema to a batch of incoming avro records of the change logs, and transforms the records to extract the actual fields of the rows in the database. In case of insert or update records (identified by the op
field), the field values are picked from the after
field in the incoming debezium record. In case of delete records (identified by the op
field), the values are picked from the before
field since after
field is null. In addition, we also add the meta fields from both database and debezium. Meta fields such as LSN
for Postgres DB help us identify the order of the events.
Since we change the schema of the incoming record in the source class, we have to provide a schema for the target record. We could propose to implement DebeziumAvroSource.java
as a RowSource
and allow spark to infer the schema of the transformed record. The other option An alternative approach is to implement a DebeziumSchemaRegistryProvider.java
class that extends the current SchemaRegistryProvider.java,
and implements the method getTargetSchema
. It constructs the target schema from the original schema by including only the fields nested within the after
field of the original record, along with the meta fields that were actually ingested.
...