Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleStoppableKafkaEnumContextProxy
@Internal
public class StoppableKafkaEnumContextProxy
    implements SplitEnumeratorContext<KafkaPartitionSplit>, AutoCloseable {

  private final KafkaClusterIdentifier kafkaClusterIdentifier;
  private final KafkaMetadataService kafkaMetadataService;
  private final SplitEnumeratorContext<MultiClusterKafkaSourceSplit> enumContext;
  private final ScheduledExecutorService subEnumeratorWorker;

  /** Wrap splits with cluster metadata. */
  public void assignSplits(SplitsAssignment<KafkaPartitionSplit> newSplitAssignments) {}

...
}


GetMetadataUpdateEvent

This is a metadata update event requesting the current metadata, sent from reader to enumerator.

Code Block
languagejava
titleGetMetadataUpdateEvent
@Internal
public class GetMetadataUpdateEvent implements SourceEvent {}

MultiClusterKafkaSourceReader

This reader is responsible for reading from 1+ clusters. At startup, the reader will first send a source event to grab the latest metadata from the enumerator before working on the splits (from state if existing). This is also done because it is hard to reason about reader failure during split assignment–the most reliable protocol is for the readers to request metadata at startup.

This enables us to filter splits and "remove" invalid splits (e.g. remove a topic partition from consumption). For example, at startup, checkpointed splits will be stored not but assigned an internal data structure–and valid splits according to the metadata will only be assigned.

Code Block
languagejava
titleGetMetadataUpdateEvent
@Internal
public class GetMetadataUpdateEvent implements SourceEvent {}

MultiClusterKafkaSourceReader

This reader is responsible for reading from 1+ clusters This is also done because it is hard to reason about reader failure during split assignment–the most reliable protocol is for the readers to request metadata at startup.

There will be error handling related to reconciliation exceptions (e.g. KafkaConsumer WakeupException if KafkaSourceReader restarts in the middle of a poll). In addition, restarting enumerators involve releasing resources from underlying thread pools. Furthermore, this enables us to remove topics from KafkaSourceReader processing, since the metadata reconciliation will induce KafkaSourceReader restart in which splits can be filtered according to the current metadata.

...