Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties


Discussion thread https://lists.apache.org/thread/vz7nw5qzvmxwnpktnofc9p13s1dzqm6z
Vote threadTBDhttps://lists.apache.org/thread/nx00y04t9bslp4mq20x1x8h268gr44o3
JIRA
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-32197
ReleaseTBDkafka-3.1.0



Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Users in large infrastructure setups often need to process and/or join data that lives live in different one or more Kafka clusters in Flink. In addition, multiple Kafka clusters may be relevant when Kafka consumers need to migrate Kafka clusters for failover or upgrades.

Some of the challenging use cases that this feature solves are:

...

This source will extend the KafkaSource to be able to read from multiple a dynamic number of Kafka clusters within a single source and introduces an interface to enable more sophisticated automation/coordination between Kafka and Flink infrastructure.

Basic Idea

MultiClusterKafkaSource DynamicKafkaSource relies on metadata to determine what clusters and topics to subscribe to, and the metadata can change over time so the source will poll for new metadata and dynamically reconcile changes on an interval.

The MultiClusterKafkaSource DynamicKafkaSource must be able to:

  1. Read from multiple clusters, topics, and partitions.
  2. Assign splits from multiple clusters.
  3. Checkpoint and commit split progress from multiple clusters.
  4. Report Kafka source metrics for multiple clusters.
  5. Communicate and reconcile metadata changes via source events from enumerator to readers.
  6. Cleanup resources (e.g. clients, thread pools, metrics) related to old clusters.

...

To provide the ability to dynamically change the underlying source components without job restart, there needs to exist a coordination mechanism to manage how the underlying KafkaSourceEnumerators and KafkaSources interact with multiple clusters and multiple topics. A KafkaMetadataService is the discovery mechanism by which the source components will reconcile metadata changes, and only the MultiClusterKafkaSourceEnumerator DynamicKafkaSourceEnumerator interacts with the KafkaMetadata Service. Periodic metadata discovery will be supported via source configuration just like topic partition discovery is supported via an interval in KafkaSource. It is possible to interpret this as a multi cluster extension of the AdminClient for KafkaSource, serving only cluster and topic metadata.

...

KafkaSource guarantees exactly once reading since offsets move forward only when checkpoint succeeds and MultiClusterKafkaSource DynamicKafkaSource inherits these properties since the source delegates the functionality from the KafkaSource components. Metadata is checkpointed and can be rebuilt from the reader split state. Exactly once guarantees can be maintained with the assumption that KafkaMetadataService does not decide to expire a cluster in which data still needs to be read. This can be solved by not destroying the old Kafka cluster until consumers are already drained (no more producer traffic and lag is 0)–in practice, a good strategy is to let data expire naturally via Kafka cluster retention. In Kafka migration switchover, the consumer would consume from both old and new clusters. With the regular KafkaSource, if Kafka deletes topic or a cluster is destroyed, the exactly once semantics are not preserved and the semantic is tightly coupled with storage. The design composes and delegates the responsibilities to KafkaSource components so it is limited to whatever KafkaSource can do for exactly once semantics–the KafkaMetadataService and source metadata reconciliation mechanism make it possible to automate migration and prevent data loss.

Metadata is first discovered on job start and new metadata is discovered via polling mechanism (see the source configuration in next section) for streaming jobs. The metadata update is designed with eventual consistency–repeated metadata polling will eventually get the correct metadata and reconcile the job accordingly. For batch jobs, this polling mechanism should be disabled.

Image RemovedImage Added

Image RemovedImage Added

NOTE: there is a need from the readers to send the GetMetadataUpdateEvent at startup because the reader state may reflect outdated metadata. Thus, the reader should not start without fresh metadata. With fresh metadata, the reader can filter splits from state--this filtering capability is ultimately how we solve the common issue of "I re-configured my Kafka source and removed some topic, but it refers to the old topic due to state".

...

To the source more user friendly, a MultiClusterKafkaSourceBuilder DynamicKafkaSourceBuilder will be provided (e.g. batch mode should not turn on KafkaMetadataService discovery, should only be done at startup).

...

This proposal does not include any changes to existing public interfaces of the KafkaSource. A new MultiClusterKafkaSource DynamicKafkaSource builder will serve as the public API and all other APIs will be marked as Internal in this proposal. 

...

Code Block
languagejava
titleBuilder Example
MultiClusterKafkaSourceDynamicKafkaSource.<String>builder()
  // some default implementations will be provided (file based, statically defined streams)
  .setKafkaMetadataService(new KafkaMetadataServiceImpl())
  .setStreamIds(List.of("my-stream-1", "my-stream-2"))
  .setGroupId("myConsumerGroup")
  .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
  .setStartingOffsets(OffsetsInitializer.earliest())
  .setProperties(properties)
  .build();

// Can configure via properties
ConfigOptions.key("multi.cluster.kafka.source.kafka-metadata-service-discovery-internal-ms")
	.longType()
	.noDefaultValue()
	.withDescription(
		"The rate at which the Kafka metadata service will be polled in milliseconds.");

...

This is logical abstraction is introduced since bootstrap servers may change although the "cluster" is still the same. Thus, name is used as a unique identifier, which also has the added benefit to use a short name for connector related metrics. Bootstrap server can be used as the name in simple usecases.

...

languagejava
titleKafkaClusterIdentifier

...

This is a string.

KafkaStream

It is possible that a Kafka stream is composed of multiple topics on multiple Kafka clusters. In addition, this flexible and general abstraction does not require any conventions on the topic naming but implementations can make assumptions to do so if desired. In the simplest case, a Kafka stream is a single topic on a single Kafka cluster.

Code Block
languagejava
titleKafkaStream
@PublicEvolving
public class KafkaStream implements Serializable {
  private final String streamId;
  // ClusterMetadata contains topics/Kafka properties like bootstrap server
  private final Map<KafkaClusterIdentifierMap<String, Set<String>>ClusterMetadata> kafkaClusterTopicMapclusterToClusterMetadata;

  public KafkaStream(
      String streamId, Map<KafkaClusterIdentifierMap<String, Set<String>>ClusterMetadata> kafkaClusterTopicMapclusterToClusterMetadata) {
    this.streamId = streamId;
    this.kafkaClusterTopicMap = kafkaClusterTopicMap; ...
  }

...
}

KafkaMetadataService

...

Code Block
languagejava
titleKafkaMetadataService
@PublicEvolving 
public interface KafkaMetadataService extends AutoCloseable, Serializable {
  /**
   * Get current metadata for all streams.
   *
   * @return set of all streams
   */
  Set<KafkaStream> getAllStreams();

  /**
   * Get current metadata for queried streams.
   *
   * @param streamIds stream full names
   * @return map of stream name to metadata
   */
  Map<String, KafkaStream> describeStreams(Collection<String> streamIds);

  /**
   * Check if the cluster is active.
   *
   * @param kafkaClusterIdentifierString Kafka cluster identifier
   * @return boolean whether the cluster is active
   */
  boolean isClusterActive(KafkaClusterIdentifierString kafkaClusterIdentifier);
}

...

Code Block
languagejava
titleMetadataUpdateEvent
@Internal
public class MetadataUpdateEvent implements SourceEvent {
  private final Map<KafkaClusterIdentifierMap<String, Set<String>>ClusterMetadata> currentClusterTopicscurrentMetadata;

...
}

...

DynamicKafkaSourceEnumerator

This reader enumerator is responsible for discovering and assigning splits from 1+ cluster. At startup, the enumerator will invoke the KafkaStreamSubscriber and reconcile changes from state. Source events will be sent to the source reader to reconcile the metadata. This enumerator has the ability to poll the KafkaMetadataService, periodically for stream discovery. In addition, restarting enumerators involve clearing outdated metrics since clusters may be removed and so should their metrics.

Code Block
languagejava
titleMultiClusterKafkaSourceEnumeratorDynamicKafkaSourceEnumerator
@PublicEvolving 
public class MultiClusterKafkaSourceEnumeratorDynamicKafkaSourceEnumerator
    implements SplitEnumerator<MultiClusterKafkaSourceSplitSplitEnumerator<DynamicKafkaSourceSplit, MultiClusterKafkaSourceEnumState>DynamicKafkaSourceEnumState> {
  
  private final Map<
          KafkaClusterIdentifierString, SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState>>
      clusterEnumeratorMap;
  private final Map<KafkaClusterIdentifierMap<String, StoppableKafkaEnumContextProxy> clusterEnumContextMap;
  private final KafkaStreamSubscriber kafkaStreamSubscriber;
  private final KafkaMetadataService kafkaMetadataService;
  private Map<KafkaClusterIdentifierMap<String, Set<String>> activeClusterTopicsMap;

  private void restartEnumerators(KafkaClusterIdentifierString kafkaClusterId, Set<TopicPartition> enumeratorState) {}

...
}

...

Code Block
languagejava
titleStoppableKafkaEnumContextProxy
@Internal
public class StoppableKafkaEnumContextProxy
    implements SplitEnumeratorContext<KafkaPartitionSplit>, AutoCloseable {

  private final KafkaClusterIdentifierString kafkaClusterIdentifier;
  private final KafkaMetadataService kafkaMetadataService;
  private final SplitEnumeratorContext<MultiClusterKafkaSourceSplit>SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext;
  private final ScheduledExecutorService subEnumeratorWorker;

  /** Wrap splits with cluster metadata. */
  public void assignSplits(SplitsAssignment<KafkaPartitionSplit> newSplitAssignments) {}

...
}

...

Code Block
languagejava
titleGetMetadataUpdateEvent
@Internal
public class GetMetadataUpdateEvent implements SourceEvent {}

...

DynamicKafkaSourceReader

This reader is responsible for reading from 1+ clusters.

...

Code Block
languagejava
titleMultiClusterKafkaSourceReaderDynamicKafkaSourceReader
@PublicEvolving 
public class MultiClusterKafkaSourceReader<T>DynamicKafkaSourceReader<T>
    implements SourceReader<T, MultiClusterKafkaSourceSplit>DynamicKafkaSourceSplit> {

  @VisibleForTesting
  final NavigableMap<KafkaClusterIdentifierNavigableMap<String, KafkaSourceReader<T>> clusterReaderMap;
  
  private void restartReader(
      KafkaClusterIdentifierString kafkaClusterId, List<KafkaPartitionSplit> readerState) {}
...
}

...

DynamicKafkaSourceSplit

This extends KafkaSource's KafkaPartitionSplit to include cluster information.

Code Block
languagejava
titleMultiClusterKafkaSourceSplitDynamicKafkaSourceSplit
@PublicEvolving 
public class MultiClusterKafkaSourceSplitDynamicKafkaSourceSplit implements SourceSplit {

  private final KafkaClusterIdentifierString kafkaClusterId;
  private final KafkaPartitionSplit kafkaPartitionSplit;

...
}

...

DynamicKafkaSource

Connecting it all together...

Code Block
languagejava
titleMultiClusterKafkaSourceDynamicKafkaSource
@PublicEvolving 
public class MultiClusterKafkaSource<T>DynamicKafkaSource<T>
    implements Source<T, MultiClusterKafkaSourceSplitDynamicKafkaSourceSplit, MultiClusterKafkaSourceEnumState>DynamicKafkaSourceEnumState>,
        ResultTypeQueryable<T> {

  private final KafkaStreamSubscriber kafkaStreamSubscriber;
  private final KafkaMetadataService kafkaMetadataService;
  private final KafkaRecordDeserializationSchema<T> deserializationSchema;
  private final OffsetsInitializer startingOffsetsInitializer;
  private final OffsetsInitializer stoppingOffsetsInitializer;
  private final Properties properties;
  private final Boundedness boundedness;

...
}

...

In the same vein as the migration from FlinkKafkaConsumer and KafkaSource, the source state is incompatible between KafkaSource and MultiClusterKafkaSource DynamicKafkaSource so it is recommended to reset all state or reset partial state by setting a different uid and starting the application from nonrestore state.

...