...
Proposed Changes
KafkaClusterIdentifier
This is logical abstraction is introduced since bootstrap servers may change although the "cluster" is still the same. Thus, name is used as a unique identifier, which also has the added benefit to use a short name for connector related metrics. Bootstrap server can be used as the name in simple usecases.
Code Block |
---|
language | java |
---|
title | KafkaClusterIdentifier |
---|
|
@PublicEvolving
public class KafkaClusterIdentifier implements Comparable<KafkaClusterIdentifier>, Serializable {
private final String name;
private final String bootstrapServers;
...
} |
KafkaStream
It is possible that a Kafka stream is composed of multiple topics on multiple Kafka clusters. In addition, this flexible and general abstraction does not require any conventions on the topic naming but implementations can make assumptions to do so if desired. In the simplest case, a Kafka stream is a single topic on a single Kafka cluster.
Code Block |
---|
language | java |
---|
title | KafkaMetadataService |
---|
|
@PublicEvolving
public class KafkaStream implements Serializable {
private final String streamId;
private final Map<KafkaClusterIdentifier, Set<String>> kafkaClusterTopicMap;
public KafkaStream(
String streamId, Map<KafkaClusterIdentifier, Set<String>> kafkaClusterTopicMap) {
this.streamId = streamId;
this.kafkaClusterTopicMap = kafkaClusterTopicMap;
}
...
} |
KafkaMetadataService
This is responsible to resolve Kafka metadata from streams. This may be backed by an external service or simply something logical that is contained in memory. A A config map file based implementation will be provided as well for convenience.
Code Block |
---|
language | java |
---|
title | KafkaMetadataService |
---|
|
@PublicEvolving
public interface KafkaMetadataService extends AutoCloseable, Serializable {
/**
* Get current metadata for all streams.
*
* @return set of all streams
*/
Set<KafkaStream> getAllStreams();
/**
* Get current metadata for queried streams.
*
* @param streamIds stream full names
* @return map of stream name to metadata
*/
Map<String, KafkaStream> describeStreams(Collection<String> streamIds);
/**
* Check if the cluster is active.
*
* @param kafkaClusterIdentifier Kafka cluster identifier
* @return boolean whether the cluster is active
*/
boolean isClusterActive(KafkaClusterIdentifier kafkaClusterIdentifier);
} |
KafkaStreamSubscriber
This is similar to KafkaSource's KafkaSubscriber. A regex subscriber will be provided to match streams by a regex pattern.
Code Block |
---|
language | java |
---|
title | KafkaStreamSubscriber |
---|
|
@PublicEvolving
public interface KafkaStreamSubscriber extends Serializable {
/** Get the set of subscribed streams. */
Set<KafkaStream> getSubscribedStreams(KafkaMetadataService kafkaMetadataService);
} |
MetadataUpdateEvent
This is a metadata update event containing the current metadata, sent from enumerator to reader.
Code Block |
---|
language | java |
---|
title | MetadataUpdateEvent |
---|
|
@Internal
public class MetadataUpdateEvent implements SourceEvent {
private final Map<KafkaClusterIdentifier, Set<String>> currentClusterTopics;
...
} |
MultiClusterKafkaSourceEnumerator
This reader is responsible for discovering and assigning splits from 1+ clusters. At startup, the enumerator will invoke the KafkaStreamSubscriber and reconcile changes from state. Source events will be sent to the source reader to reconcile the metadata. This enumerator has the ability to poll the KafkaMetadataService, periodically for stream discovery. In addition, restarting enumerators involve clearing outdated metrics.
Code Block |
---|
language | java |
---|
title | MultiClusterKafkaSourceEnumerator |
---|
|
@PublicEvolving
public class MultiClusterKafkaSourceEnumerator
implements SplitEnumerator<MultiClusterKafkaSourceSplit, MultiClusterKafkaSourceEnumState> {
private final Map<
KafkaClusterIdentifier, SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState>>
clusterEnumeratorMap;
private final Map<KafkaClusterIdentifier, StoppableKafkaEnumContextProxy> clusterEnumContextMap;
private final KafkaStreamSubscriber kafkaStreamSubscriber;
private final KafkaMetadataService kafkaMetadataService;
private Map<KafkaClusterIdentifier, Set<String>> activeClusterTopicsMap;
private void restartEnumerators(KafkaClusterIdentifier kafkaClusterId, Set<TopicPartition> enumeratorState) {}
...
} |
...