Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleBuilder Example
MultiClusterKafkaSource.<String>builder()
  // some default implementations will be provided (file based, statically defined streams)
  .setKafkaMetadataService(new KafkaMetadataServiceImpl())
  .setStreamIds(List.of("my-stream-1", "my-stream-2"))
  .setGroupId("myConsumerGroup")
  .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
  .setStartingOffsets(OffsetsInitializer.earliest())
  .setProperties(properties)
  .build();

Basic Idea

MultiClusterKafkaSource relies on metadata to determine what clusters and topics to subscribe to. The metadata can change over time so the source will poll for new metadata and reconcile changes on an interval.

The MultiClusterKafkaSource must be able to:

  1. Assign splits (partitions) from multiple clusters.
  2. Checkpoint splits from multiple clusters.

The components of the KafkaSource solve the requirements for a single Kafka cluster and the low level components are composed into order to provide the functionality to read from multiple Kafka clusters. For example, an underlying KafkaSourceEnumerator will be used to discover splits, checkpoint assigned splits, and do periodic partition discovery. Likewise, an underlying KafkaSourceReader will be used to poll and deserialize records, checkpoint split state, and commit offsets back to Kafka.

...