Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Proposed Changes

KafkaClusterIdentifier

This is logical abstraction is introduced since bootstrap servers may change although the "cluster" is still the same. Thus, name is used as a unique identifier, which also has the added benefit to use a short name for connector related metrics. Bootstrap server can be used as the name in simple usecases.

Code Block
languagejava
titleKafkaClusterIdentifier
@PublicEvolving 
public class KafkaClusterIdentifier implements Comparable<KafkaClusterIdentifier>, Serializable {
  private final String name;
  private final String bootstrapServers;

...
}

KafkaStream

It is possible that a Kafka stream is composed of multiple topics on multiple Kafka clusters. In addition, this flexible and general abstraction does not require any conventions on the topic naming but implementations can make assumptions to do so if desired. In the simplest case, a Kafka stream is a single topic on a single Kafka cluster.

Code Block
languagejava
titleKafkaMetadataService
@PublicEvolving
public class KafkaStream implements Serializable {
  private final String streamId;
  private final Map<KafkaClusterIdentifier, Set<String>> kafkaClusterTopicMap;

  public KafkaStream(
      String streamId, Map<KafkaClusterIdentifier, Set<String>> kafkaClusterTopicMap) {
    this.streamId = streamId;
    this.kafkaClusterTopicMap = kafkaClusterTopicMap;
  }

...
}


KafkaMetadataService

This is responsible to resolve Kafka metadata from streams. This may be backed by an external service or simply something logical that is contained in memory.  A A config map file based implementation will be provided as well for convenience.

Code Block
languagejava
titleKafkaMetadataService
@PublicEvolving 
public interface KafkaMetadataService extends AutoCloseable, Serializable {
  /**
   * Get current metadata for all streams.
   *
   * @return set of all streams
   */
  Set<KafkaStream> getAllStreams();

  /**
   * Get current metadata for queried streams.
   *
   * @param streamIds stream full names
   * @return map of stream name to metadata
   */
  Map<String, KafkaStream> describeStreams(Collection<String> streamIds);

  /**
   * Check if the cluster is active.
   *
   * @param kafkaClusterIdentifier Kafka cluster identifier
   * @return boolean whether the cluster is active
   */
  boolean isClusterActive(KafkaClusterIdentifier kafkaClusterIdentifier);
}

KafkaStreamSubscriber

This is similar to KafkaSource's KafkaSubscriber. A regex subscriber will be provided to match streams by a regex pattern.

Code Block
languagejava
titleKafkaStreamSubscriber
@PublicEvolving
public interface KafkaStreamSubscriber extends Serializable {

  /** Get the set of subscribed streams. */
  Set<KafkaStream> getSubscribedStreams(KafkaMetadataService kafkaMetadataService);
}

MetadataUpdateEvent

This is a metadata update event containing the current metadata, sent from enumerator to reader.

Code Block
languagejava
titleMetadataUpdateEvent
@Internal
public class MetadataUpdateEvent implements SourceEvent {
  private final Map<KafkaClusterIdentifier, Set<String>> currentClusterTopics;

...
}

MultiClusterKafkaSourceEnumerator

This reader is responsible for discovering and assigning splits from 1+ clusters. At startup, the enumerator will invoke the KafkaStreamSubscriber and reconcile changes from state. Source events will be sent to the source reader to reconcile the metadata. This enumerator has the ability to poll the KafkaMetadataService, periodically for stream discovery. In addition, restarting enumerators involve clearing outdated metrics.

Code Block
languagejava
titleMultiClusterKafkaSourceEnumerator
@PublicEvolving 
public class MultiClusterKafkaSourceEnumerator
    implements SplitEnumerator<MultiClusterKafkaSourceSplit, MultiClusterKafkaSourceEnumState> {
  
  private final Map<
          KafkaClusterIdentifier, SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState>>
      clusterEnumeratorMap;
  private final Map<KafkaClusterIdentifier, StoppableKafkaEnumContextProxy> clusterEnumContextMap;
  private final KafkaStreamSubscriber kafkaStreamSubscriber;
  private final KafkaMetadataService kafkaMetadataService;
  private Map<KafkaClusterIdentifier, Set<String>> activeClusterTopicsMap;

  private void restartEnumerators(KafkaClusterIdentifier kafkaClusterId, Set<TopicPartition> enumeratorState) {}

...
}

...