Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleHybridSourceBuilder Example
MultiClusterKafkaSource.<String>builder()
  // some default implementations will be provided (file based, statically defined streams)
  .setKafkaMetadataService(new KafkaMetadataServiceImpl())
  .setStreamIds(List.of("my-stream-1", "my-stream-2"))
  .setGroupId("myConsumerGroup")
  .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
  .setStartingOffsets(OffsetsInitializer.earliest())
  .setProperties(properties)
  .build();

...

Code Block
languagejava
titleHybridSourceKafkaClusterIdentifier
@PublicEvolving 
public class KafkaClusterIdentifier implements Comparable<KafkaClusterIdentifier>, Serializable {
  private final String name;
  private final String bootstrapServers;

...
}

...

Code Block
languagejava
titleHybridSourceKafkaMetadataService
@PublicEvolving 
public interface KafkaMetadataService extends AutoCloseable, Serializable {
  /**
   * Get current metadata for all streams.
   *
   * @return set of all streams
   */
  Set<KafkaStream> getAllStreams();

  /**
   * Get current metadata for queried streams.
   *
   * @param streamIds stream full names
   * @return map of stream name to metadata
   */
  Map<String, KafkaStream> describeStreams(Collection<String> streamIds);

  /**
   * Check if the cluster is active.
   *
   * @param kafkaClusterIdentifier Kafka cluster identifier
   * @return boolean whether the cluster is active
   */
  boolean isClusterActive(KafkaClusterIdentifier kafkaClusterIdentifier);
}

...

Code Block
languagejava
titleHybridSourceKafkaStreamSubscriber
@PublicEvolving
public interface KafkaStreamSubscriber extends Serializable {

  /** Get the set of subscribed streams. */
  Set<KafkaStream> getSubscribedStreams(KafkaMetadataService kafkaMetadataService);
}

...

Code Block
languagejava
titleHybridSourceMetadataUpdateEvent
@Internal
public class MetadataUpdateEvent implements SourceEvent {
  private final Map<KafkaClusterIdentifier, Set<String>> currentClusterTopics;

...
}

...

Code Block
languagejava
titleHybridSourceMultiClusterKafkaSourceEnumerator
@PublicEvolving 
public class MultiClusterKafkaSourceEnumerator
    implements SplitEnumerator<MultiClusterKafkaSourceSplit, MultiClusterKafkaSourceEnumState> {
  
  private final Map<
          KafkaClusterIdentifier, SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState>>
      clusterEnumeratorMap;
  private final Map<KafkaClusterIdentifier, StoppableKafkaEnumContextProxy> clusterEnumContextMap;
  private final KafkaStreamSubscriber kafkaStreamSubscriber;
  private final KafkaMetadataService kafkaMetadataService;
  private Map<KafkaClusterIdentifier, Set<String>> activeClusterTopicsMap;

  private void restartEnumerators(KafkaClusterIdentifier kafkaClusterId, Set<TopicPartition> enumeratorState) {}

...
}

...

Code Block
languagejava
titleHybridSourceStoppableKafkaEnumContextProxy
@Internal
public class StoppableKafkaEnumContextProxy
    implements SplitEnumeratorContext<KafkaPartitionSplit>, AutoCloseable {

  private final KafkaClusterIdentifier kafkaClusterIdentifier;
  private final KafkaMetadataService kafkaMetadataService;
  private final SplitEnumeratorContext<MultiClusterKafkaSourceSplit> enumContext;
  private final ScheduledExecutorService subEnumeratorWorker;

  /** Wrap splits with cluster metadata. */
  public void assignSplits(SplitsAssignment<KafkaPartitionSplit> newSplitAssignments) {}

...
}

...

Code Block
languagejava
titleHybridSourceGetMetadataUpdateEvent
@Internal
public class GetMetadataUpdateEvent implements SourceEvent {}

...

Code Block
languagejava
titleHybridSourceMultiClusterKafkaSourceReader
@PublicEvolving 
public class MultiClusterKafkaSourceReader<T>
    implements SourceReader<T, MultiClusterKafkaSourceSplit> {

  @VisibleForTesting
  final NavigableMap<KafkaClusterIdentifier, KafkaSourceReader<T>> clusterReaderMap;
  
  private void restartReader(
      KafkaClusterIdentifier kafkaClusterId, List<KafkaPartitionSplit> readerState) {}
...
}

...

Code Block
languagejava
titleHybridSourceMultiClusterKafkaSourceSplit
@PublicEvolving 
public class MultiClusterKafkaSourceSplit implements SourceSplit {

  private final KafkaClusterIdentifier kafkaClusterId;
  private final KafkaPartitionSplit kafkaPartitionSplit;

...
}

...

Code Block
languagejava
titleHybridSourceMultiClusterKafkaSource
@PublicEvolving 
public class MultiClusterKafkaSource<T>
    implements Source<T, MultiClusterKafkaSourceSplit, MultiClusterKafkaSourceEnumState>,
        ResultTypeQueryable<T> {

  private final KafkaStreamSubscriber kafkaStreamSubscriber;
  private final KafkaMetadataService kafkaMetadataService;
  private final KafkaRecordDeserializationSchema<T> deserializationSchema;
  private final OffsetsInitializer startingOffsetsInitializer;
  private final OffsetsInitializer stoppingOffsetsInitializer;
  private final Properties properties;
  private final Boundedness boundedness;

...
}

...