Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

MultiClusterKafkaSource relies on metadata to determine what clusters and topics to subscribe to, and the metadata can change over time so the source will poll for new metadata and dynamically reconcile changes on an interval.

...

  1. Read from multiple clusters, topics, and partitions.
  2. Assign splits from multiple clusters.
  3. Checkpoint and commit split progress from multiple clusters.
  4. Report Kafka source metrics for multiple clusters.
  5. Communicate and reconcile metadata changes via source events from enumerator to readers.
  6. Cleanup resources (e.g. clients, thread pools, metrics) related to old clusters.

The KafkaSource solves the requirements 1-4 for a single Kafka cluster and the low level components can be composed into order to provide the functionality for multiple Kafka clusters. For example, an underlying KafkaSourceEnumerator will be used to discover splits, checkpoint assigned splits, and do periodic partition discovery. Likewise, an underlying KafkaSourceReader will be used to poll and deserialize records, checkpoint split state, and commit offsets back to Kafka.

With To provide the ability to dynamically change the underlying source components without job restart, there needs to exist a coordination mechanism to manage how the underlying KafkaSourceEnumerators and KafkaSources interact with multiple clusters and multiple topics. To discover the required clusters and topics, a Kafka Metadata Service provides the metadata and allows the source components to A KafkaMetadataService is the discovery mechanism by which the source components will reconcile metadata changes, and only the MultiClusterKafkaSourceEnumerator interacts with the Kafka Metadata KafkaMetadata Service. Periodic metadata discovery will be supported via source configuration just like topic partition discovery is supported via an interval in KafkaSource.

...

In addition, KafkaStream is introduced as abstraction that part of the metadata and contains a logical mapping to physical Kafka clusters and Kafka topics, which can be used to derive metadata for source statetransient and change dynamically. Changes in metadata are detected by the source enumerator and propagated to source reader readers via source events to reconcile the changes. 

Reconciliation is designed as KafkaSourceEnumerator and KafkaSourceReader restarts–this enables us to "remove" splits. There is careful consideration for resource cleanup and error handling--for example thread pools, metrics, and KafkaConsumer errors.

Other required functionality leverages and composes the existing KafkaSource implementation for discovering Kafka topic partition offsets , committing offsets, doing the actual Kafka Consumer polling, snapshotting state, and split coordination and round robin split assignment per Kafka cluster. We The diagram below depicts how the source will reuse the code of the KafkaSource in order to achieve this.

...