Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Other required functionality leverages and composes the existing KafkaSource implementation for discovering Kafka topic partition offsets, committing offsets, doing the actual Kafka Consumer polling, snapshotting state, and split coordination per Kafka cluster.

To the source more user friendly, a MultiClusterKafkaSourceBuilder will be provided (e.g. batch mode should not turn on KafkaMetadataService discovery, should only be done at startup).

Proposed Changes

KafkaClusterIdentifier

...

This reader is responsible for reading from 1+ clusters. At startup, the reader will first send a source event to grab the latest metadata from the enumerator before working on the splits (from state if existing). There will be error handling related to reconciliation exceptions (e.g. KafkaConsumer WakeupException if KafkaSourceReader restarts in the middle of a poll). In addition, restarting enumerators involve releasing resources from underlying thread pools. Furthermore, this enables us to remove topics from KafkaSourceReader processing, since the metadata reconciliation will induce KafkaSourceReader restart in which splits can be filtered according to the current metadata.

Code Block
languagejava
titleHybridSource
@PublicEvolving 
public class MultiClusterKafkaSourceReader<T>
    implements SourceReader<T, MultiClusterKafkaSourceSplit> {

  @VisibleForTesting
  final NavigableMap<KafkaClusterIdentifier, KafkaSourceReader<T>> clusterReaderMap;
  
  private void restartReader(
      KafkaClusterIdentifier kafkaClusterId, List<KafkaPartitionSplit> readerState) {}
...
}

...