Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleBuilder Example
MultiClusterKafkaSource.<String>builder()
  // some default implementations will be provided (file based, statically defined streams)
  .setKafkaMetadataService(new KafkaMetadataServiceImpl())
  .setStreamIds(List.of("my-stream-1", "my-stream-2"))
  .setGroupId("myConsumerGroup")
  .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
  .setStartingOffsets(OffsetsInitializer.earliest())
  .setProperties(properties)
  .build();

Basic Idea

The components of the KafkaSource solve the requirements for a single Kafka cluster and the low level components are composed into order to provide the functionality to read from multiple Kafka clusters. For example, an underlying KafkaSourceEnumerator will be used to discover splits, checkpoint assigned splits, and do periodic partition discovery. Likewise, an underlying KafkaSourceReader will be used to poll and deserialize records, checkpoint split state, and commit offsets back to Kafka.

With the ability to dynamically change the underlying source components without job restart, there There needs to exist a coordination mechanism to manage how the underlying KafkaSourceEnumerators and KafkaSources interact with multiple clusters and multiple topics. This design leverages the source event protocol to sync metadata between source components.To discover the required clusters and topics, a Kafka Metadata Service provides the metadata and allows the source components to reconcile metadata changes, only the MultiClusterKafkaSourceEnumerator interacts with the Kafka Metadata Service.

A default implementation will be provided so that native Kubernetes configmap can easily control the metadata (yaml/json file). 

In addition, KafkaStream is is introduced as abstraction that contains a contains a logical mapping to physical Kafka clusters and Kafka topics, which can be used to derive metadata for source state. Changes in metadata are detected by the source enumerator and propagated to source reader to allow the via source components events to reconcile the changes.

Reconciliation is designed as KafkaSourceEnumerator and KafkaSourceReader restartsrestarts–this enables us to "remove" splits. There is careful consideration for resource cleanup and error handling--for example thread pools, metrics, and KafkaConsumer errors.

...

It is possible that a Kafka stream is composed of multiple topics on multiple Kafka clusters. In addition, this flexible and general abstraction does not require any conventions on the topic naming but implementations can make assumptions to do so if desired. In the simplest case, a Kafka stream is a single topic on a single Kafka cluster.

...