Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties


Discussion threadhttps://lists.apache.org/thread/vz7nw5qzvmxwnpktnofc9p13s1dzqm6z
Vote threadhttps://lists.apache.org/thread/nx00y04t9bslp4mq20x1x8h268gr44o3
JIRA
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-32197
ReleaseTBDkafka-3.1.0



Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Metadata is first discovered on job start and new metadata is discovered via polling mechanism (see the source configuration in next section) for streaming jobs. The metadata update is designed with eventual consistency–repeated metadata polling will eventually get the correct metadata and reconcile the job accordingly. For batch jobs, this polling mechanism should be disabled.

Image RemovedImage Added

Image RemovedImage Added

NOTE: there is a need from the readers to send the GetMetadataUpdateEvent at startup because the reader state may reflect outdated metadata. Thus, the reader should not start without fresh metadata. With fresh metadata, the reader can filter splits from state--this filtering capability is ultimately how we solve the common issue of "I re-configured my Kafka source and removed some topic, but it refers to the old topic due to state".

...

DynamicKafkaSourceEnumerator

This reader enumerator is responsible for discovering and assigning splits from 1+ cluster. At startup, the enumerator will invoke the KafkaStreamSubscriber and reconcile changes from state. Source events will be sent to the source reader to reconcile the metadata. This enumerator has the ability to poll the KafkaMetadataService, periodically for stream discovery. In addition, restarting enumerators involve clearing outdated metrics since clusters may be removed and so should their metrics.

Code Block
languagejava
titleMultiClusterKafkaSourceEnumeratorDynamicKafkaSourceEnumerator
@PublicEvolving 
public class DynamicKafkaSourceEnumerator
    implements SplitEnumerator<DynamicKafkaSourceSplit, DynamicKafkaSourceEnumState> {
  
  private final Map<
          String, SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState>>
      clusterEnumeratorMap;
  private final Map<String, StoppableKafkaEnumContextProxy> clusterEnumContextMap;
  private final KafkaStreamSubscriber kafkaStreamSubscriber;
  private final KafkaMetadataService kafkaMetadataService;
  private Map<String, Set<String>> activeClusterTopicsMap;

  private void restartEnumerators(String kafkaClusterId, Set<TopicPartition> enumeratorState) {}

...
}

...

Code Block
languagejava
titleMultiClusterKafkaSourceReaderDynamicKafkaSourceReader
@PublicEvolving 
public class DynamicKafkaSourceReader<T>
    implements SourceReader<T, DynamicKafkaSourceSplit> {

  @VisibleForTesting
  final NavigableMap<String, KafkaSourceReader<T>> clusterReaderMap;
  
  private void restartReader(
      String kafkaClusterId, List<KafkaPartitionSplit> readerState) {}
...
}

...

Code Block
languagejava
titleMultiClusterKafkaSourceSplitDynamicKafkaSourceSplit
@PublicEvolving 
public class DynamicKafkaSourceSplit implements SourceSplit {

  private final String kafkaClusterId;
  private final KafkaPartitionSplit kafkaPartitionSplit;

...
}

...

Code Block
languagejava
titleMultiClusterKafkaSourceDynamicKafkaSource
@PublicEvolving 
public class DynamicKafkaSource<T>
    implements Source<T, DynamicKafkaSourceSplit, DynamicKafkaSourceEnumState>,
        ResultTypeQueryable<T> {

  private final KafkaStreamSubscriber kafkaStreamSubscriber;
  private final KafkaMetadataService kafkaMetadataService;
  private final KafkaRecordDeserializationSchema<T> deserializationSchema;
  private final OffsetsInitializer startingOffsetsInitializer;
  private final OffsetsInitializer stoppingOffsetsInitializer;
  private final Properties properties;
  private final Boundedness boundedness;

...
}

...