...
This is logical abstraction is introduced since bootstrap servers may change although the "cluster" is still the same. Thus, name is used as a unique identifier, which also has the added benefit to use a short name for connector related metrics. Bootstrap server can be used as the name in simple usecases.
...
language | java |
---|
title | KafkaClusterIdentifier |
---|
...
This is a string.
KafkaStream
It is possible that a Kafka stream is composed of multiple topics on multiple Kafka clusters. In addition, this flexible and general abstraction does not require any conventions on the topic naming but implementations can make assumptions to do so if desired. In the simplest case, a Kafka stream is a single topic on a single Kafka cluster.
Code Block |
---|
language | java |
---|
title | KafkaStream |
---|
|
@PublicEvolving
public class KafkaStream implements Serializable {
private final String streamId;;
// ClusterMetadata contains topics/Kafka properties like bootstrap server
private final Map<KafkaClusterIdentifierMap<String, Set<String>>ClusterMetadata> kafkaClusterTopicMapclusterToClusterMetadata;
public KafkaStream(
String streamId, Map<KafkaClusterIdentifierMap<String, Set<String>>ClusterMetadata> kafkaClusterTopicMapclusterToClusterMetadata) {
this.streamId = streamId;
this.kafkaClusterTopicMap = kafkaClusterTopicMap; ...
}
...
} |
KafkaMetadataService
...
Code Block |
---|
language | java |
---|
title | KafkaMetadataService |
---|
|
@PublicEvolving
public interface KafkaMetadataService extends AutoCloseable, Serializable {
/**
* Get current metadata for all streams.
*
* @return set of all streams
*/
Set<KafkaStream> getAllStreams();
/**
* Get current metadata for queried streams.
*
* @param streamIds stream full names
* @return map of stream name to metadata
*/
Map<String, KafkaStream> describeStreams(Collection<String> streamIds);
/**
* Check if the cluster is active.
*
* @param kafkaClusterIdentifierString Kafka cluster identifier
* @return boolean whether the cluster is active
*/
boolean isClusterActive(KafkaClusterIdentifierString kafkaClusterIdentifier);
} |
...
Code Block |
---|
language | java |
---|
title | MetadataUpdateEvent |
---|
|
@Internal
public class MetadataUpdateEvent implements SourceEvent {
private final Map<KafkaClusterIdentifierMap<String, Set<String>>ClusterMetadata> currentClusterTopicscurrentMetadata;
...
} |
MultiClusterKafkaSourceEnumerator
...
Code Block |
---|
language | java |
---|
title | MultiClusterKafkaSourceEnumerator |
---|
|
@PublicEvolving
public class MultiClusterKafkaSourceEnumerator
implements SplitEnumerator<MultiClusterKafkaSourceSplit, MultiClusterKafkaSourceEnumState> {
private final Map<
KafkaClusterIdentifierString, SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState>>
clusterEnumeratorMap;
private final Map<KafkaClusterIdentifierMap<String, StoppableKafkaEnumContextProxy> clusterEnumContextMap;
private final KafkaStreamSubscriber kafkaStreamSubscriber;
private final KafkaMetadataService kafkaMetadataService;
private Map<KafkaClusterIdentifierMap<String, Set<String>> activeClusterTopicsMap;
private void restartEnumerators(KafkaClusterIdentifierString kafkaClusterId, Set<TopicPartition> enumeratorState) {}
...
} |
...
Code Block |
---|
language | java |
---|
title | StoppableKafkaEnumContextProxy |
---|
|
@Internal
public class StoppableKafkaEnumContextProxy
implements SplitEnumeratorContext<KafkaPartitionSplit>, AutoCloseable {
private final KafkaClusterIdentifierString kafkaClusterIdentifier;
private final KafkaMetadataService kafkaMetadataService;
private final SplitEnumeratorContext<MultiClusterKafkaSourceSplit> enumContext;
private final ScheduledExecutorService subEnumeratorWorker;
/** Wrap splits with cluster metadata. */
public void assignSplits(SplitsAssignment<KafkaPartitionSplit> newSplitAssignments) {}
...
} |
...
Code Block |
---|
language | java |
---|
title | MultiClusterKafkaSourceReader |
---|
|
@PublicEvolving
public class MultiClusterKafkaSourceReader<T>
implements SourceReader<T, MultiClusterKafkaSourceSplit> {
@VisibleForTesting
final NavigableMap<KafkaClusterIdentifierNavigableMap<String, KafkaSourceReader<T>> clusterReaderMap;
private void restartReader(
KafkaClusterIdentifierString kafkaClusterId, List<KafkaPartitionSplit> readerState) {}
...
} |
...
Code Block |
---|
language | java |
---|
title | MultiClusterKafkaSourceSplit |
---|
|
@PublicEvolving
public class MultiClusterKafkaSourceSplit implements SourceSplit {
private final KafkaClusterIdentifierString kafkaClusterId;
private final KafkaPartitionSplit kafkaPartitionSplit;
...
} |
...