THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
MultiClusterKafkaSource.<String>builder() // some default implementations will be provided (file based, statically defined streams) .setKafkaMetadataService(new KafkaMetadataServiceImpl()) .setStreamIds(List.of("my-stream-1", "my-stream-2")) .setGroupId("myConsumerGroup") .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)) .setStartingOffsets(OffsetsInitializer.earliest()) .setProperties(properties) .build(); |
...
Code Block | ||||
---|---|---|---|---|
| ||||
@PublicEvolving public class KafkaClusterIdentifier implements Comparable<KafkaClusterIdentifier>, Serializable { private final String name; private final String bootstrapServers; ... } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
@PublicEvolving public interface KafkaMetadataService extends AutoCloseable, Serializable { /** * Get current metadata for all streams. * * @return set of all streams */ Set<KafkaStream> getAllStreams(); /** * Get current metadata for queried streams. * * @param streamIds stream full names * @return map of stream name to metadata */ Map<String, KafkaStream> describeStreams(Collection<String> streamIds); /** * Check if the cluster is active. * * @param kafkaClusterIdentifier Kafka cluster identifier * @return boolean whether the cluster is active */ boolean isClusterActive(KafkaClusterIdentifier kafkaClusterIdentifier); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
@PublicEvolving public interface KafkaStreamSubscriber extends Serializable { /** Get the set of subscribed streams. */ Set<KafkaStream> getSubscribedStreams(KafkaMetadataService kafkaMetadataService); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
@Internal public class MetadataUpdateEvent implements SourceEvent { private final Map<KafkaClusterIdentifier, Set<String>> currentClusterTopics; ... } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
@PublicEvolving public class MultiClusterKafkaSourceEnumerator implements SplitEnumerator<MultiClusterKafkaSourceSplit, MultiClusterKafkaSourceEnumState> { private final Map< KafkaClusterIdentifier, SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState>> clusterEnumeratorMap; private final Map<KafkaClusterIdentifier, StoppableKafkaEnumContextProxy> clusterEnumContextMap; private final KafkaStreamSubscriber kafkaStreamSubscriber; private final KafkaMetadataService kafkaMetadataService; private Map<KafkaClusterIdentifier, Set<String>> activeClusterTopicsMap; private void restartEnumerators(KafkaClusterIdentifier kafkaClusterId, Set<TopicPartition> enumeratorState) {} ... } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
@Internal public class StoppableKafkaEnumContextProxy implements SplitEnumeratorContext<KafkaPartitionSplit>, AutoCloseable { private final KafkaClusterIdentifier kafkaClusterIdentifier; private final KafkaMetadataService kafkaMetadataService; private final SplitEnumeratorContext<MultiClusterKafkaSourceSplit> enumContext; private final ScheduledExecutorService subEnumeratorWorker; /** Wrap splits with cluster metadata. */ public void assignSplits(SplitsAssignment<KafkaPartitionSplit> newSplitAssignments) {} ... } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
@Internal public class GetMetadataUpdateEvent implements SourceEvent {} |
...
Code Block | ||||
---|---|---|---|---|
| ||||
@PublicEvolving public class MultiClusterKafkaSourceReader<T> implements SourceReader<T, MultiClusterKafkaSourceSplit> { @VisibleForTesting final NavigableMap<KafkaClusterIdentifier, KafkaSourceReader<T>> clusterReaderMap; private void restartReader( KafkaClusterIdentifier kafkaClusterId, List<KafkaPartitionSplit> readerState) {} ... } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
@PublicEvolving public class MultiClusterKafkaSourceSplit implements SourceSplit { private final KafkaClusterIdentifier kafkaClusterId; private final KafkaPartitionSplit kafkaPartitionSplit; ... } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
@PublicEvolving public class MultiClusterKafkaSource<T> implements Source<T, MultiClusterKafkaSourceSplit, MultiClusterKafkaSourceEnumState>, ResultTypeQueryable<T> { private final KafkaStreamSubscriber kafkaStreamSubscriber; private final KafkaMetadataService kafkaMetadataService; private final KafkaRecordDeserializationSchema<T> deserializationSchema; private final OffsetsInitializer startingOffsetsInitializer; private final OffsetsInitializer stoppingOffsetsInitializer; private final Properties properties; private final Boundedness boundedness; ... } |
...