Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

StoppableKafkaEnumContextProxy

This enumerator context proxy facilitates the ability to close executors used by scheduled callables in the underlying KafkaSourceEnumerators and wraps the KafkaPartitionSplits with cluster information.

KafkaSourceEnumerators need to properly cleanup the topic partition discovery scheduled callable in restart. This can also handles safely handle errors with the scheduled callables when metadata is not sync with source state.

Code Block
languagejava
titleStoppableKafkaEnumContextProxy
@Internal
public class StoppableKafkaEnumContextProxy
    implements SplitEnumeratorContext<KafkaPartitionSplit>, AutoCloseable {

  private final KafkaClusterIdentifier kafkaClusterIdentifier;
  private final KafkaMetadataService kafkaMetadataService;
  private final SplitEnumeratorContext<MultiClusterKafkaSourceSplit> enumContext;
  private final ScheduledExecutorService subEnumeratorWorker;

  /** Wrap splits with cluster metadata. */
  public void assignSplits(SplitsAssignment<KafkaPartitionSplit> newSplitAssignments) {}

...
}

...

MultiClusterKafkaSourceSplit

This extends KafkaSource's KafkaPartitionSplit to include cluster information.

Code Block
languagejava
titleMultiClusterKafkaSourceSplit
@PublicEvolving 
public class MultiClusterKafkaSourceSplit implements SourceSplit {

  private final KafkaClusterIdentifier kafkaClusterId;
  private final KafkaPartitionSplit kafkaPartitionSplit;

...
}

MultiClusterKafkaSource

Connecting it all together...

Code Block
languagejava
titleMultiClusterKafkaSource
@PublicEvolving 
public class MultiClusterKafkaSource<T>
    implements Source<T, MultiClusterKafkaSourceSplit, MultiClusterKafkaSourceEnumState>,
        ResultTypeQueryable<T> {

  private final KafkaStreamSubscriber kafkaStreamSubscriber;
  private final KafkaMetadataService kafkaMetadataService;
  private final KafkaRecordDeserializationSchema<T> deserializationSchema;
  private final OffsetsInitializer startingOffsetsInitializer;
  private final OffsetsInitializer stoppingOffsetsInitializer;
  private final Properties properties;
  private final Boundedness boundedness;

...
}

...