Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Current stateUnder Discussion

Discussion thread:

JIRA: here (<- link to

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


This source will extend the KafkaSource to be able to read from multiple Kafka clusters within a single source. In addition, the source can adjust the clusters and topics the source consumes from dynamically, without Flink job restart.

Some of the challenging use cases that these features solve are:

  1. Transparent Kafka cluster addition/removal without Flink job restart.
  2. Transparent Kafka topic addition/removal without Flink job restart.
  3. Direct integration with Hybrid Source.

Public Interfaces

The source will use the FLIP-27: Refactor Source Interface to integrate it with Flink and support both bounded and unbounded jobs.

This proposal does not include any changes to existing public interfaces. A new MultiClusterKafkaSource builder will serve as the public API and all other APIs will be marked as Internal in this proposal. 

The new source will go into the Kafka connector module and follow any connector repository changes of Kafka Source.

An example of building the new Source in unbounded mode

Code Block
titleBuilder Example
  // some default implementations will be provided (file based, statically defined streams)
  .setKafkaMetadataService(new KafkaMetadataServiceImpl())
  .setStreamIds(List.of("my-stream-1", "my-stream-2"))

Basic Idea

There needs to exist a coordination mechanism to manage how the underlying KafkaSourceEnumerators and KafkaSources interact with multiple clusters and multiple topics. This design leverages the source event protocol to sync metadata between source components.

In addition, KafkaStream is introduced as abstraction that contains a logical mapping to physical Kafka clusters and Kafka topics, which can be used to derive metadata for source state. Changes in metadata are detected by the source enumerator and propagated to source reader to allow the source components to reconcile the changes.

Reconciliation is designed as KafkaSourceEnumerator and KafkaSourceReader restarts. There is careful consideration for resource cleanup and error handling--for example thread pools, metrics, and KafkaConsumer errors.

Other required functionality leverages and composes the existing KafkaSource implementation for discovering Kafka topic partition offsets, committing offsets, doing the actual Kafka Consumer polling, snapshotting state, and split coordination per Kafka cluster.

To the source more user friendly, a MultiClusterKafkaSourceBuilder will be provided (e.g. batch mode should not turn on KafkaMetadataService discovery, should only be done at startup).

Proposed Changes


This is logical abstraction is introduced since bootstrap servers may change although the "cluster" is still the same. Thus, name is used as a unique identifier, which also has the added benefit to use a short name for connector related metrics. Bootstrap server can be used as the name in simple usecases.

Code Block
public class KafkaClusterIdentifier implements Comparable<KafkaClusterIdentifier>, Serializable {
  private final String name;
  private final String bootstrapServers;



This is responsible to resolve Kafka metadata from streams. This may be backed by an external service or simply something logical that is contained in memory.  A config map file based implementation will be provided as well for convenience.

Code Block
public interface KafkaMetadataService extends AutoCloseable, Serializable {
   * Get current metadata for all streams.
   * @return set of all streams
  Set<KafkaStream> getAllStreams();

   * Get current metadata for queried streams.
   * @param streamIds stream full names
   * @return map of stream name to metadata
  Map<String, KafkaStream> describeStreams(Collection<String> streamIds);

   * Check if the cluster is active.
   * @param kafkaClusterIdentifier Kafka cluster identifier
   * @return boolean whether the cluster is active
  boolean isClusterActive(KafkaClusterIdentifier kafkaClusterIdentifier);


This is similar to KafkaSource's KafkaSubscriber. A regex subscriber will be provided to match streams by a regex pattern.

Code Block
public interface KafkaStreamSubscriber extends Serializable {

  /** Get the set of subscribed streams. */
  Set<KafkaStream> getSubscribedStreams(KafkaMetadataService kafkaMetadataService);


This is a metadata update event containing the current metadata, sent from enumerator to reader.

Code Block
public class MetadataUpdateEvent implements SourceEvent {
  private final Map<KafkaClusterIdentifier, Set<String>> currentClusterTopics;



This reader is responsible for discovering and assigning splits from 1+ clusters. At startup, the enumerator will invoke the KafkaStreamSubscriber and reconcile changes from state. Source events will be sent to the source reader to reconcile the metadata. This enumerator has the ability to poll the KafkaMetadataService, periodically for stream discovery. In addition, restarting enumerators involve clearing outdated metrics.

Code Block
public class MultiClusterKafkaSourceEnumerator
    implements SplitEnumerator<MultiClusterKafkaSourceSplit, MultiClusterKafkaSourceEnumState> {
  private final Map<
          KafkaClusterIdentifier, SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState>>
  private final Map<KafkaClusterIdentifier, StoppableKafkaEnumContextProxy> clusterEnumContextMap;
  private final KafkaStreamSubscriber kafkaStreamSubscriber;
  private final KafkaMetadataService kafkaMetadataService;
  private Map<KafkaClusterIdentifier, Set<String>> activeClusterTopicsMap;

  private void restartEnumerators(KafkaClusterIdentifier kafkaClusterId, Set<TopicPartition> enumeratorState) {}



This enumerator context proxy facilitates the ability to close executors used by scheduled callables in the underlying KafkaSourceEnumerators. This also handles errors with the scheduled callables when metadata is not sync with source state.

Code Block
public class StoppableKafkaEnumContextProxy
    implements SplitEnumeratorContext<KafkaPartitionSplit>, AutoCloseable {

  private final KafkaClusterIdentifier kafkaClusterIdentifier;
  private final KafkaMetadataService kafkaMetadataService;
  private final SplitEnumeratorContext<MultiClusterKafkaSourceSplit> enumContext;
  private final ScheduledExecutorService subEnumeratorWorker;

  /** Wrap splits with cluster metadata. */
  public void assignSplits(SplitsAssignment<KafkaPartitionSplit> newSplitAssignments) {}



This is a metadata update event requesting the current metadata, sent from reader to enumerator.

Code Block
public class GetMetadataUpdateEvent implements SourceEvent {}


This reader is responsible for reading from 1+ clusters. At startup, the reader will first send a source event to grab the latest metadata from the enumerator before working on the splits (from state if existing). There will be error handling related to reconciliation exceptions (e.g. KafkaConsumer WakeupException if KafkaSourceReader restarts in the middle of a poll). In addition, restarting enumerators involve releasing resources from underlying thread pools. Furthermore, this enables us to remove topics from KafkaSourceReader processing, since the metadata reconciliation will induce KafkaSourceReader restart in which splits can be filtered according to the current metadata.

Code Block
public class MultiClusterKafkaSourceReader<T>
    implements SourceReader<T, MultiClusterKafkaSourceSplit> {

  final NavigableMap<KafkaClusterIdentifier, KafkaSourceReader<T>> clusterReaderMap;
  private void restartReader(
      KafkaClusterIdentifier kafkaClusterId, List<KafkaPartitionSplit> readerState) {}


This extends KafkaSource's KafkaPartitionSplit to include cluster information.

Code Block
public class MultiClusterKafkaSourceSplit implements SourceSplit {

  private final KafkaClusterIdentifier kafkaClusterId;
  private final KafkaPartitionSplit kafkaPartitionSplit;



Connecting it all together...

Code Block
public class MultiClusterKafkaSource<T>
    implements Source<T, MultiClusterKafkaSourceSplit, MultiClusterKafkaSourceEnumState>,
        ResultTypeQueryable<T> {

  private final KafkaStreamSubscriber kafkaStreamSubscriber;
  private final KafkaMetadataService kafkaMetadataService;
  private final KafkaRecordDeserializationSchema<T> deserializationSchema;
  private final OffsetsInitializer startingOffsetsInitializer;
  private final OffsetsInitializer stoppingOffsetsInitializer;
  private final Properties properties;
  private final Boundedness boundedness;


Compatibility, Deprecation, and Migration Plan

The source is opt in and would require users to implement code changes.

In the same vein as the migration from FlinkKafkaConsumer and KafkaSource, the source state is incompatible between KafkaSource and MultiClusterKafkaSource so it is recommended to reset all state or reset partial state by setting a different uid and starting the application from nonrestore state.

Test Plan

This will be tested by unit and integration tests. The work will extend existing KafkaSource test utilities in Flink to exercise multiple clusters.

Rejected Alternatives
