Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Page properties


Discussion thread https://lists.apache.org/thread/vz7nw5qzvmxwnpktnofc9p13s1dzqm6z
Vote threadTBDhttps://lists.apache.org/thread/nx00y04t9bslp4mq20x1x8h268gr44o3
JIRA
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-32197
ReleaseTBDkafka-3.1.0



Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Users in large infrastructure setups often need to process and/or join data that lives live in different one or more Kafka clusters in Flink. In addition, multiple Kafka clusters may be relevant when Kafka consumers need to migrate Kafka clusters for failover or upgrades.

Some of the challenging use cases that this feature solves are:

...

This source will extend the KafkaSource to be able to read from multiple a dynamic number of Kafka clusters within a single source and introduces an interface to enable more sophisticated automation/coordination between Kafka and Flink infrastructure.

Basic Idea

MultiClusterKafkaSource DynamicKafkaSource relies on metadata to determine what clusters and topics to subscribe to, and the metadata can change over time so the source will poll for new metadata and dynamically reconcile changes on an interval.

The MultiClusterKafkaSource DynamicKafkaSource must be able to:

  1. Read from multiple clusters, topics, and partitions.
  2. Assign splits from multiple clusters.
  3. Checkpoint and commit split progress from multiple clusters.
  4. Report Kafka source metrics for multiple clusters.
  5. Communicate and reconcile metadata changes via source events from enumerator to readers.
  6. Cleanup resources (e.g. clients, thread pools, metrics) related to old clusters.

The KafkaSource solves the requirements 1-4 for a single Kafka cluster and the low level components can be composed into order to provide the functionality for multiple Kafka clusters. For example, an underlying KafkaSourceEnumerator will be used to discover splits, checkpoint assigned splits, and do periodic partition discovery. Likewise, an underlying KafkaSourceReader will be used to poll and deserialize records, checkpoint split state, and commit offsets back to Kafka.

To provide the ability to dynamically change the underlying source components without job restart, there needs to exist a coordination mechanism to manage how the underlying KafkaSourceEnumerators and KafkaSources interact with multiple clusters and multiple topics. A KafkaMetadataService is the discovery mechanism by which the source components will reconcile metadata changes, and only the MultiClusterKafkaSourceEnumerator interacts with the KafkaMetadata Service. Periodic metadata discovery will be supported via source configuration just like topic partition discovery is supported via an interval in KafkaSource.

A default implementation will be provided so that native Kubernetes configmap can easily control the metadata (yaml/json file). This implementation is targeted for the basic use cases where external monitoring will inform how users change the metadata.

In addition, KafkaStream is part of the metadata and contains a logical mapping to physical Kafka clusters and Kafka topics, which can be transient and change dynamically. Changes in metadata are detected by the enumerator and propagated to readers via source events to reconcile the changes. In addition, Kafka clusters are uniquely identified by a string id since there could be multiple bootstrap servers lists that can read a certain Kafka cluster (e.g. kafka-server1:9092, kafka-server2:9092 and kafka-server1:9092).

Reconciliation is designed as KafkaSourceEnumerator and KafkaSourceReader restarts–this enables us to "remove" splits. There is careful consideration for resource cleanup and error handling--for example thread pools, metrics, and KafkaConsumer errors.

Other required functionality leverages and composes the existing KafkaSource implementation for discovering Kafka topic partition offsets and round robin split assignment per Kafka cluster. The diagrams below depicts how the source will reuse the code of the KafkaSource in order to achieve the requirements.

Image Removed

Image Removed

To the source more user friendly, a MultiClusterKafkaSourceBuilder will be provided (e.g. batch mode should not turn on KafkaMetadataService discovery, should only be done at startup).

Public Interfaces

The source will use the FLIP-27: Refactor Source Interface to integrate it with Flink and support both bounded and unbounded jobs.

This proposal does not include any changes to existing public interfaces of the KafkaSource. A new MultiClusterKafkaSource builder will serve as the public API and all other APIs will be marked as Internal in this proposal. 

The new source will go into the Kafka connector module and follow any connector repository changes of Kafka Source.

An example of building the new Source in unbounded mode

Code Block
languagejava
titleBuilder Example
MultiClusterKafkaSource.<String>builder()
  // some default implementations will be provided (file based, statically defined streams)
  .setKafkaMetadataService(new KafkaMetadataServiceImpl())
  .setStreamIds(List.of("my-stream-1", "my-stream-2"))
  .setGroupId("myConsumerGroup")
  .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
  .setStartingOffsets(OffsetsInitializer.earliest())
  .setProperties(properties)
  .build();

Proposed Changes

KafkaClusterIdentifier

This is logical abstraction is introduced since bootstrap servers may change although the "cluster" is still the same. Thus, name is used as a unique identifier, which also has the added benefit to use a short name for connector related metrics. Bootstrap server can be used as the name in simple usecases.

Code Block
languagejava
titleKafkaClusterIdentifier
@PublicEvolving 
public class KafkaClusterIdentifier implements Comparable<KafkaClusterIdentifier>, Serializable {
  private final String name;
  private final String bootstrapServers;

...
}

KafkaStream

Reconciliation is designed as KafkaSourceEnumerator and KafkaSourceReader restarts–this enables us to "remove" splits. There is careful consideration for resource cleanup and error handling--for example thread pools, metrics, and KafkaConsumer errors.

Other required functionality leverages and composes the existing KafkaSource implementation for discovering Kafka topic partition offsets and round robin split assignment per Kafka cluster. The diagrams below depicts how the source will reuse the code of the KafkaSource in order to achieve the requirements.

The Kafka Metadata Service

To provide the ability to dynamically change the underlying source components without job restart, there needs to exist a coordination mechanism to manage how the underlying KafkaSourceEnumerators and KafkaSources interact with multiple clusters and multiple topics. A KafkaMetadataService is the discovery mechanism by which the source components will reconcile metadata changes, and only the DynamicKafkaSourceEnumerator interacts with the KafkaMetadata Service. Periodic metadata discovery will be supported via source configuration just like topic partition discovery is supported via an interval in KafkaSource. It is possible to interpret this as a multi cluster extension of the AdminClient for KafkaSource, serving only cluster and topic metadata.

A default implementation will be provided so that native Kubernetes configmap can easily control the metadata (yaml/json file). This implementation is targeted for the basic use cases where external monitoring will inform how users change the metadata.

KafkaStream and KafkaClusterId

In addition, KafkaStream is part of the metadata returned by KafkaMetadataService and contains a logical mapping to physical Kafka clusters and Kafka topics, which can be transient and change dynamically. Changes in metadata are detected by the enumerator and propagated to readers via source events to reconcile the changes. In addition, Kafka clusters are uniquely identified by a string id since there could be multiple bootstrap servers lists that can read a certain Kafka cluster (e.g. kafka-server1:9092, kafka-server2:9092 and kafka-server1:9092).

Exactly Once Semantics and Consistency Guarantees

KafkaSource guarantees exactly once reading since offsets move forward only when checkpoint succeeds and DynamicKafkaSource inherits these properties since the source delegates the functionality from the KafkaSource components. Metadata is checkpointed and can be rebuilt from the reader split state. Exactly once guarantees can be maintained with the assumption that KafkaMetadataService does not decide to expire a cluster in which data still needs to be read. This can be solved by not destroying the old Kafka cluster until consumers are already drained (no more producer traffic and lag is 0)–in practice, a good strategy is to let data expire naturally via Kafka cluster retention. In Kafka migration switchover, the consumer would consume from both old and new clusters. With the regular KafkaSource, if Kafka deletes topic or a cluster is destroyed, the exactly once semantics are not preserved and the semantic is tightly coupled with storage. The design composes and delegates the responsibilities to KafkaSource components so it is limited to whatever KafkaSource can do for exactly once semantics–the KafkaMetadataService and source metadata reconciliation mechanism make it possible to automate migration and prevent data loss.

Metadata is first discovered on job start and new metadata is discovered via polling mechanism (see the source configuration in next section) for streaming jobs. The metadata update is designed with eventual consistency–repeated metadata polling will eventually get the correct metadata and reconcile the job accordingly. For batch jobs, this polling mechanism should be disabled.

Image Added

Image Added

NOTE: there is a need from the readers to send the GetMetadataUpdateEvent at startup because the reader state may reflect outdated metadata. Thus, the reader should not start without fresh metadata. With fresh metadata, the reader can filter splits from state--this filtering capability is ultimately how we solve the common issue of "I re-configured my Kafka source and removed some topic, but it refers to the old topic due to state".


To the source more user friendly, a DynamicKafkaSourceBuilder will be provided (e.g. batch mode should not turn on KafkaMetadataService discovery, should only be done at startup).

Public Interfaces

The source will use the FLIP-27: Refactor Source Interface to integrate it with Flink and support both bounded and unbounded jobs.

This proposal does not include any changes to existing public interfaces of the KafkaSource. A new DynamicKafkaSource builder will serve as the public API and all other APIs will be marked as Internal in this proposal. 

The new source will go into the Kafka connector module and follow any connector repository changes of Kafka Source.

An example of building the new Source in unbounded modeIt is possible that a Kafka stream is composed of multiple topics on multiple Kafka clusters. In addition, this flexible and general abstraction does not require any conventions on the topic naming but implementations can make assumptions to do so if desired. In the simplest case, a Kafka stream is a single topic on a single Kafka cluster.

Code Block
languagejava
titleKafkaStream
@PublicEvolving
public class KafkaStream implements Serializable {
  private final String streamId;
  private final Map<KafkaClusterIdentifier, Set<String>> kafkaClusterTopicMap;

  public KafkaStream(
      String streamId, Map<KafkaClusterIdentifier, Set<String>> kafkaClusterTopicMap) {
    this.streamId = streamId;
    this.kafkaClusterTopicMap = kafkaClusterTopicMap;
  }

...
}

...

This is responsible to resolve Kafka metadata from streams. This may be backed by an external service or simply something logical that is contained in memory. A config map file based implementation will be provided as well for convenience. Similarly to KafkaSource subscriber integration, the #getAllStreams() API is supported here to be able to filter out streams, for example, by a regex.

Builder Example
DynamicKafkaSource.<String>builder()
  // some default implementations will be provided (file based, statically defined streams)
  .setKafkaMetadataService(new KafkaMetadataServiceImpl())
  .setStreamIds(List.of("my-stream-1", "my-stream-2"))
  .setGroupId("myConsumerGroup")
  .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
  .setStartingOffsets(OffsetsInitializer.earliest())
  .setProperties(properties)
  .build();

// Can configure via properties
ConfigOptions.key("multi.cluster.kafka.source.kafka-metadata-service-discovery-internal-ms")
	.longType()
	.noDefaultValue()
	.withDescription(
		"The rate at which the Kafka metadata service will be polled in milliseconds.");

Proposed Changes

KafkaClusterIdentifier

This is logical abstraction is introduced since bootstrap servers may change although the "cluster" is still the same. Thus, name is used as a unique identifier, which also has the added benefit to use a short name for connector related metrics. Bootstrap server can be used as the name in simple usecases. This is a string.

KafkaStream

It is possible that a Kafka stream is composed of multiple topics on multiple Kafka clusters. In addition, this flexible and general abstraction does not require any conventions on the topic naming but implementations can make assumptions to do so if desired. In the simplest case, a Kafka stream is a single topic on a single Kafka clusterThis interface represents the source of truth for the current metadata and metadata that is removed is considered non-active (e.g. removing a cluster from the return value, means that a cluster is non-active and should not be read from).

Code Block
languagejava
titleKafkaMetadataServiceKafkaStream
@PublicEvolving 
public interfaceclass KafkaMetadataServiceKafkaStream extendsimplements AutoCloseable, Serializable {
  /**
private final String *streamId;
 Get current// metadataClusterMetadata forcontains all streams.
   *topics/Kafka properties like bootstrap server
  private *final @returnMap<String, set of all streamsClusterMetadata> clusterToClusterMetadata;

  public KafkaStream(
   */
  Set<KafkaStream> getAllStreams();

  /**
   * Get current metadata for queried streams String streamId, Map<String, ClusterMetadata> clusterToClusterMetadata) {
      ...
   *
   * @param streamIds stream full names
   * @return map of stream name to metadata
   */
  Map<String, KafkaStream> describeStreams(Collection<String> streamIds);

  /**
   * Check if the cluster is active.
   *
   * @param kafkaClusterIdentifier Kafka cluster identifier
   * @return boolean whether the cluster is active
   */
  boolean isClusterActive(KafkaClusterIdentifier kafkaClusterIdentifier);
}

KafkaStreamSubscriber

This is similar to KafkaSource's KafkaSubscriber. A regex subscriber will be provided to match streams by a regex pattern.

}

...
}

KafkaMetadataService

This is responsible to resolve Kafka metadata from streams. This may be backed by an external service or simply something logical that is contained in memory. A config map file based implementation will be provided as well for convenience. Similarly to KafkaSource subscriber integration, the #getAllStreams() API is supported here to be able to filter out streams, for example, by a regex.

This interface represents the source of truth for the current metadata and metadata that is removed is considered non-active (e.g. removing a cluster from the return value, means that a cluster is non-active and should not be read from).

Code Block
languagejava
titleKafkaMetadataService
@PublicEvolving 
public interface KafkaMetadataService extends AutoCloseable, Serializable {
  /**
   * Get current metadata for all streams.
   *
   * @return set of all streams
   */
  Set<KafkaStream> getAllStreams();
Code Block
languagejava
titleKafkaStreamSubscriber
@PublicEvolving
public interface KafkaStreamSubscriber extends Serializable {

  /**
 Get the set* ofGet subscribed current metadata for queried streams.
   */
   Set<KafkaStream> getSubscribedStreams(KafkaMetadataService kafkaMetadataService);
}

MetadataUpdateEvent

This is a metadata update event containing the current metadata, sent from enumerator to reader. The metadata does not include stream information since it is not required by the reader, which does not directly interact with streams or the KafkaMetadataService.

Code Block
languagejava
titleMetadataUpdateEvent
@Internal
public class MetadataUpdateEvent implements SourceEvent {
  private final Map<KafkaClusterIdentifier, Set<String>> currentClusterTopics;

...
}

MultiClusterKafkaSourceEnumerator

This reader is responsible for discovering and assigning splits from 1+ clusters. At startup, the enumerator will invoke the KafkaStreamSubscriber and reconcile changes from state. Source events will be sent to the source reader to reconcile the metadata. This enumerator has the ability to poll the KafkaMetadataService, periodically for stream discovery. In addition, restarting enumerators involve clearing outdated metrics since clusters may be removed and so should their metrics.

* @param streamIds stream full names
   * @return map of stream name to metadata
   */
  Map<String, KafkaStream> describeStreams(Collection<String> streamIds);

  /**
   * Check if the cluster is active.
   *
   * @param String Kafka cluster identifier
   * @return boolean whether the cluster is active
   */
  boolean isClusterActive(String kafkaClusterIdentifier);
}

KafkaStreamSubscriber

This is similar to KafkaSource's KafkaSubscriber. A regex subscriber will be provided to match streams by a regex pattern.

Code Block
languagejava
titleKafkaStreamSubscriber
@PublicEvolving
public interface KafkaStreamSubscriber extends Serializable {

  /** Get the set of subscribed streams. */
  Set<KafkaStream> getSubscribedStreams(KafkaMetadataService kafkaMetadataService);
}

MetadataUpdateEvent

This is a metadata update event containing the current metadata, sent from enumerator to reader. The metadata does not include stream information since it is not required by the reader, which does not directly interact with streams or the KafkaMetadataService.

There is also a "GetMetadataUpdateEvent" so that readers can request for metadata before starting. This is essential to filter expired metadata at startup.

Code Block
languagejava
titleMetadataUpdateEvent
@Internal
public class MetadataUpdateEvent implements SourceEvent {
  private final Map<String, ClusterMetadata> currentMetadata;
Code Block
languagejava
titleMultiClusterKafkaSourceEnumerator
@PublicEvolving 
public class MultiClusterKafkaSourceEnumerator
    implements SplitEnumerator<MultiClusterKafkaSourceSplit, MultiClusterKafkaSourceEnumState> {
  
  private final Map<
          KafkaClusterIdentifier, SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState>>
      clusterEnumeratorMap;
  private final Map<KafkaClusterIdentifier, StoppableKafkaEnumContextProxy> clusterEnumContextMap;
  private final KafkaStreamSubscriber kafkaStreamSubscriber;
  private final KafkaMetadataService kafkaMetadataService;
  private Map<KafkaClusterIdentifier, Set<String>> activeClusterTopicsMap;

  private void restartEnumerators(KafkaClusterIdentifier kafkaClusterId, Set<TopicPartition> enumeratorState) {}

...
}

...

DynamicKafkaSourceEnumerator

This enumerator context proxy facilitates the ability to close executors used by scheduled callables in the underlying KafkaSourceEnumerators and wraps the KafkaPartitionSplits with cluster information.KafkaSourceEnumerators need to properly cleanup the topic partition discovery scheduled callable in restart. This can also safely handle errors with the scheduled callables when metadata is not sync with source state.is responsible for discovering and assigning splits from 1+ cluster. At startup, the enumerator will invoke the KafkaStreamSubscriber and reconcile changes from state. Source events will be sent to the source reader to reconcile the metadata. This enumerator has the ability to poll the KafkaMetadataService, periodically for stream discovery. In addition, restarting enumerators involve clearing outdated metrics since clusters may be removed and so should their metrics.

Code Block
languagejava
titleStoppableKafkaEnumContextProxyDynamicKafkaSourceEnumerator
@Internal@PublicEvolving 
public class StoppableKafkaEnumContextProxyDynamicKafkaSourceEnumerator
    implements SplitEnumeratorContext<KafkaPartitionSplit>SplitEnumerator<DynamicKafkaSourceSplit, AutoCloseableDynamicKafkaSourceEnumState> {
  
  private final KafkaClusterIdentifier kafkaClusterIdentifier;Map<
  private final KafkaMetadataService kafkaMetadataService;
  private final SplitEnumeratorContext<MultiClusterKafkaSourceSplit> enumContext;
  private final ScheduledExecutorService subEnumeratorWorker;

  /** Wrap splits with cluster metadata. */
  public void assignSplits(SplitsAssignment<KafkaPartitionSplit> newSplitAssignments) {}

String, SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState>>
      clusterEnumeratorMap;
  private final Map<String, StoppableKafkaEnumContextProxy> clusterEnumContextMap;
  private final KafkaStreamSubscriber kafkaStreamSubscriber;
  private final KafkaMetadataService kafkaMetadataService;
  private Map<String, Set<String>> activeClusterTopicsMap;

  private void restartEnumerators(String kafkaClusterId, Set<TopicPartition> enumeratorState) {}

...
}

...

StoppableKafkaEnumContextProxy

This is a metadata update event requesting the current metadata, sent from reader to enumerator.

At startup, the reader will first send a source event to grab the latest metadata from the enumerator before working on the splits (from state if existing). This is also done because it is hard to reason about reader failure during split assignment–the most reliable protocol is for the readers to request metadata at startup.

This enables us to filter splits and "remove" invalid splits (e.g. remove a topic partition from consumption). For example, at startup, checkpointed splits will be stored not but assigned an internal data structure–and valid splits according to the metadata will only be assigned.

Code Block
languagejava
titleGetMetadataUpdateEvent
@Internal
public class GetMetadataUpdateEvent implements SourceEvent {}

MultiClusterKafkaSourceReader

This reader is responsible for reading from 1+ clusters.

There will be error handling related to reconciliation exceptions (e.g. KafkaConsumer WakeupException if KafkaSourceReader restarts in the middle of a poll). In addition, restarting enumerators involve releasing resources from underlying thread pools. Furthermore, this enables us to remove topics from KafkaSourceReader processing, since the metadata reconciliation will induce KafkaSourceReader restart in which splits can be filtered according to the current metadata.

Code Block
languagejava
titleMultiClusterKafkaSourceReader
@PublicEvolving 
public class MultiClusterKafkaSourceReader<T>
    implements SourceReader<T, MultiClusterKafkaSourceSplit> {

  @VisibleForTesting
  final NavigableMap<KafkaClusterIdentifier, KafkaSourceReader<T>> clusterReaderMap;
  
  private void restartReader(
      KafkaClusterIdentifier kafkaClusterId, List<KafkaPartitionSplit> readerState) {}
...
}

MultiClusterKafkaSourceSplit

This extends KafkaSource's KafkaPartitionSplit to include cluster information.

Code Block
languagejava
titleMultiClusterKafkaSourceSplit
@PublicEvolving 
public class MultiClusterKafkaSourceSplit implements SourceSplit {

  private final KafkaClusterIdentifier kafkaClusterId;
  private final KafkaPartitionSplit kafkaPartitionSplit;

...
}

MultiClusterKafkaSource

enumerator context proxy facilitates the ability to close executors used by scheduled callables in the underlying KafkaSourceEnumerators and wraps the KafkaPartitionSplits with cluster information.

KafkaSourceEnumerators need to properly cleanup the topic partition discovery scheduled callable in restart. This can also safely handle errors with the scheduled callables when metadata is not sync with source state.

Code Block
languagejava
titleStoppableKafkaEnumContextProxy
@Internal
public class StoppableKafkaEnumContextProxy
    implements SplitEnumeratorContext<KafkaPartitionSplit>, AutoCloseable {

  private final String kafkaClusterIdentifier;
  private final KafkaMetadataService kafkaMetadataService;
  private final SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext;
  private final ScheduledExecutorService subEnumeratorWorker;

  /** Wrap splits with cluster metadata. */
  public void assignSplits(SplitsAssignment<KafkaPartitionSplit> newSplitAssignments) {}

...
}

GetMetadataUpdateEvent

This is a metadata update event requesting the current metadata, sent from reader to enumerator.

At startup, the reader will first send a source event to grab the latest metadata from the enumerator before working on the splits (from state if existing). This is also done because it is hard to reason about reader failure during split assignment–the most reliable protocol is for the readers to request metadata at startup.

This enables us to filter splits and "remove" invalid splits (e.g. remove a topic partition from consumption). For example, at startup, checkpointed splits will be stored not but assigned an internal data structure–and valid splits according to the metadata will only be assigned.

Code Block
languagejava
titleGetMetadataUpdateEvent
@Internal
public class GetMetadataUpdateEvent implements SourceEvent {}

DynamicKafkaSourceReader

This reader is responsible for reading from 1+ clusters.

There will be error handling related to reconciliation exceptions (e.g. KafkaConsumer WakeupException if KafkaSourceReader restarts in the middle of a poll). In addition, restarting enumerators involve releasing resources from underlying thread pools. Furthermore, this enables us to remove topics from KafkaSourceReader processing, since the metadata reconciliation will induce KafkaSourceReader restart in which splits can be filtered according to the current metadata.Connecting it all together...

Code Block
languagejava
titleMultiClusterKafkaSourceDynamicKafkaSourceReader
@PublicEvolving 
public class MultiClusterKafkaSource<T>DynamicKafkaSourceReader<T>
    implements Source<TSourceReader<T, MultiClusterKafkaSourceSplit, MultiClusterKafkaSourceEnumState>,DynamicKafkaSourceSplit> {

  @VisibleForTesting
  final NavigableMap<String, KafkaSourceReader<T>> clusterReaderMap;
 ResultTypeQueryable<T> {

  private private final KafkaStreamSubscriber kafkaStreamSubscriber;void restartReader(
  private final KafkaMetadataService kafkaMetadataService;
 String privatekafkaClusterId, finalList<KafkaPartitionSplit> KafkaRecordDeserializationSchema<T> deserializationSchema;
  private final OffsetsInitializer startingOffsetsInitializer;readerState) {}
...
}

DynamicKafkaSourceSplit

This extends KafkaSource's KafkaPartitionSplit to include cluster information.

Code Block
languagejava
titleDynamicKafkaSourceSplit
@PublicEvolving 
public class DynamicKafkaSourceSplit implements SourceSplit {

  private final OffsetsInitializerString stoppingOffsetsInitializerkafkaClusterId;
  private final PropertiesKafkaPartitionSplit propertieskafkaPartitionSplit;

...
}

DynamicKafkaSource

Connecting it all together...

Code Block
languagejava
titleDynamicKafkaSource
@PublicEvolving 
public class privateDynamicKafkaSource<T>
 final Boundedness boundedness;

...
}

Compatibility, Deprecation, and Migration Plan

The source is opt in and would require users to implement code changes.

In the same vein as the migration from FlinkKafkaConsumer and KafkaSource, the source state is incompatible between KafkaSource and MultiClusterKafkaSource so it is recommended to reset all state or reset partial state by setting a different uid and starting the application from nonrestore state.

Test Plan

  implements Source<T, DynamicKafkaSourceSplit, DynamicKafkaSourceEnumState>,
        ResultTypeQueryable<T> {

  private final KafkaStreamSubscriber kafkaStreamSubscriber;
  private final KafkaMetadataService kafkaMetadataService;
  private final KafkaRecordDeserializationSchema<T> deserializationSchema;
  private final OffsetsInitializer startingOffsetsInitializer;
  private final OffsetsInitializer stoppingOffsetsInitializer;
  private final Properties properties;
  private final Boundedness boundedness;

...
}

Compatibility, Deprecation, and Migration Plan

The source is opt in and would require users to implement code changes.

In the same vein as the migration from FlinkKafkaConsumer and KafkaSource, the source state is incompatible between KafkaSource and DynamicKafkaSource so it is recommended to reset all state or reset partial state by setting a different uid and starting the application from nonrestore state.

Test Plan

This will be tested by unit and integration tests. The work will extend existing KafkaSource test utilities in Flink to exercise multiple clusters.

The testcontainers utilities can be used to create multiple Kafka clusters and the file based implementation of Kafka metadata service would be essential in testing metadata changes.

Future Improvements

KafkaMetadataService can also coordinate the configurations of the source such as Kafka properties and offset initialization strategies. For now, the proposal only includes making 1 common configuration defined by the user. The Kafka properties is essential for cases such as enforcing the max message size configuration from the Kafka server and security configurationsThis will be tested by unit and integration tests. The work will extend existing KafkaSource test utilities in Flink to exercise multiple clusters.

Rejected Alternatives

None