...
Page properties | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Metadata is first discovered on job start and new metadata is discovered via polling mechanism (see the source configuration in next section) for streaming jobs. The metadata update is designed with eventual consistency–repeated metadata polling will eventually get the correct metadata and reconcile the job accordingly. For batch jobs, this polling mechanism should be disabled.
NOTE: there is a need from the readers to send the GetMetadataUpdateEvent at startup because the reader state may reflect outdated metadata. Thus, the reader should not start without fresh metadata. With fresh metadata, the reader can filter splits from state--this filtering capability is ultimately how we solve the common issue of "I re-configured my Kafka source and removed some topic, but it refers to the old topic due to state".
...
DynamicKafkaSourceEnumerator
This reader enumerator is responsible for discovering and assigning splits from 1+ cluster. At startup, the enumerator will invoke the KafkaStreamSubscriber and reconcile changes from state. Source events will be sent to the source reader to reconcile the metadata. This enumerator has the ability to poll the KafkaMetadataService, periodically for stream discovery. In addition, restarting enumerators involve clearing outdated metrics since clusters may be removed and so should their metrics.
Code Block | ||||
---|---|---|---|---|
| ||||
@PublicEvolving public class DynamicKafkaSourceEnumerator implements SplitEnumerator<DynamicKafkaSourceSplit, DynamicKafkaSourceEnumState> { private final Map< String, SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState>> clusterEnumeratorMap; private final Map<String, StoppableKafkaEnumContextProxy> clusterEnumContextMap; private final KafkaStreamSubscriber kafkaStreamSubscriber; private final KafkaMetadataService kafkaMetadataService; private Map<String, Set<String>> activeClusterTopicsMap; private void restartEnumerators(String kafkaClusterId, Set<TopicPartition> enumeratorState) {} ... } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
@PublicEvolving public class DynamicKafkaSourceReader<T> implements SourceReader<T, DynamicKafkaSourceSplit> { @VisibleForTesting final NavigableMap<String, KafkaSourceReader<T>> clusterReaderMap; private void restartReader( String kafkaClusterId, List<KafkaPartitionSplit> readerState) {} ... } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
@PublicEvolving public class DynamicKafkaSourceSplit implements SourceSplit { private final String kafkaClusterId; private final KafkaPartitionSplit kafkaPartitionSplit; ... } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
@PublicEvolving public class DynamicKafkaSource<T> implements Source<T, DynamicKafkaSourceSplit, DynamicKafkaSourceEnumState>, ResultTypeQueryable<T> { private final KafkaStreamSubscriber kafkaStreamSubscriber; private final KafkaMetadataService kafkaMetadataService; private final KafkaRecordDeserializationSchema<T> deserializationSchema; private final OffsetsInitializer startingOffsetsInitializer; private final OffsetsInitializer stoppingOffsetsInitializer; private final Properties properties; private final Boundedness boundedness; ... } |
...