Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This is logical abstraction is introduced since bootstrap servers may change although the "cluster" is still the same. Thus, name is used as a unique identifier, which also has the added benefit to use a short name for connector related metrics. Bootstrap server can be used as the name in simple usecases.

...

languagejava
titleKafkaClusterIdentifier

...

This is a string.

KafkaStream

It is possible that a Kafka stream is composed of multiple topics on multiple Kafka clusters. In addition, this flexible and general abstraction does not require any conventions on the topic naming but implementations can make assumptions to do so if desired. In the simplest case, a Kafka stream is a single topic on a single Kafka cluster.

Code Block
languagejava
titleKafkaStream
@PublicEvolving
public class KafkaStream implements Serializable {
  private final String streamId;;
  // ClusterMetadata contains topics/Kafka properties like bootstrap server
  private final Map<KafkaClusterIdentifierMap<String, Set<String>>ClusterMetadata> kafkaClusterTopicMapclusterToClusterMetadata;

  public KafkaStream(
      String streamId, Map<KafkaClusterIdentifierMap<String, Set<String>>ClusterMetadata> kafkaClusterTopicMapclusterToClusterMetadata) {
    this.streamId = streamId;
    this.kafkaClusterTopicMap = kafkaClusterTopicMap; ...
  }

...
}

KafkaMetadataService

...

Code Block
languagejava
titleKafkaMetadataService
@PublicEvolving 
public interface KafkaMetadataService extends AutoCloseable, Serializable {
  /**
   * Get current metadata for all streams.
   *
   * @return set of all streams
   */
  Set<KafkaStream> getAllStreams();

  /**
   * Get current metadata for queried streams.
   *
   * @param streamIds stream full names
   * @return map of stream name to metadata
   */
  Map<String, KafkaStream> describeStreams(Collection<String> streamIds);

  /**
   * Check if the cluster is active.
   *
   * @param kafkaClusterIdentifierString Kafka cluster identifier
   * @return boolean whether the cluster is active
   */
  boolean isClusterActive(KafkaClusterIdentifierString kafkaClusterIdentifier);
}

...

Code Block
languagejava
titleMetadataUpdateEvent
@Internal
public class MetadataUpdateEvent implements SourceEvent {
  private final Map<KafkaClusterIdentifierMap<String, Set<String>>ClusterMetadata> currentClusterTopicscurrentMetadata;

...
}

MultiClusterKafkaSourceEnumerator

...

Code Block
languagejava
titleMultiClusterKafkaSourceEnumerator
@PublicEvolving 
public class MultiClusterKafkaSourceEnumerator
    implements SplitEnumerator<MultiClusterKafkaSourceSplit, MultiClusterKafkaSourceEnumState> {
  
  private final Map<
          KafkaClusterIdentifierString, SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState>>
      clusterEnumeratorMap;
  private final Map<KafkaClusterIdentifierMap<String, StoppableKafkaEnumContextProxy> clusterEnumContextMap;
  private final KafkaStreamSubscriber kafkaStreamSubscriber;
  private final KafkaMetadataService kafkaMetadataService;
  private Map<KafkaClusterIdentifierMap<String, Set<String>> activeClusterTopicsMap;

  private void restartEnumerators(KafkaClusterIdentifierString kafkaClusterId, Set<TopicPartition> enumeratorState) {}

...
}

...

Code Block
languagejava
titleStoppableKafkaEnumContextProxy
@Internal
public class StoppableKafkaEnumContextProxy
    implements SplitEnumeratorContext<KafkaPartitionSplit>, AutoCloseable {

  private final KafkaClusterIdentifierString kafkaClusterIdentifier;
  private final KafkaMetadataService kafkaMetadataService;
  private final SplitEnumeratorContext<MultiClusterKafkaSourceSplit> enumContext;
  private final ScheduledExecutorService subEnumeratorWorker;

  /** Wrap splits with cluster metadata. */
  public void assignSplits(SplitsAssignment<KafkaPartitionSplit> newSplitAssignments) {}

...
}

...

Code Block
languagejava
titleMultiClusterKafkaSourceReader
@PublicEvolving 
public class MultiClusterKafkaSourceReader<T>
    implements SourceReader<T, MultiClusterKafkaSourceSplit> {

  @VisibleForTesting
  final NavigableMap<KafkaClusterIdentifierNavigableMap<String, KafkaSourceReader<T>> clusterReaderMap;
  
  private void restartReader(
      KafkaClusterIdentifierString kafkaClusterId, List<KafkaPartitionSplit> readerState) {}
...
}

...

Code Block
languagejava
titleMultiClusterKafkaSourceSplit
@PublicEvolving 
public class MultiClusterKafkaSourceSplit implements SourceSplit {

  private final KafkaClusterIdentifierString kafkaClusterId;
  private final KafkaPartitionSplit kafkaPartitionSplit;

...
}

...