Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This source will extend the KafkaSource to be able to read from multiple Kafka clusters within a single source. In addition, the source can adjust the clusters and topics the source consumes from dynamically, without Flink job restart.

Public Interfaces

The source will use the FLIP-27: Refactor Source Interface to integrate it with Flink and support both bounded and unbounded jobs.

This proposal does not include any changes to existing public interfaces of the KafkaSource. A new MultiClusterKafkaSource builder will serve as the public API and all other APIs will be marked as Internal in this proposal. 

The new source will go into the Kafka connector module and follow any connector repository changes of Kafka Source.

An example of building the new Source in unbounded mode

Code Block
languagejava
titleBuilder Example
MultiClusterKafkaSource.<String>builder()
  // some default implementations will be provided (file based, statically defined streams)
  .setKafkaMetadataService(new KafkaMetadataServiceImpl())
  .setStreamIds(List.of("my-stream-1", "my-stream-2"))
  .setGroupId("myConsumerGroup")
  .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
  .setStartingOffsets(OffsetsInitializer.earliest())
  .setProperties(properties)
  .build();

Basic Idea

MultiClusterKafkaSource relies on metadata to determine what clusters and topics to subscribe to. The metadata can change over time so the source will poll for new metadata and reconcile changes on an interval.

...

To the source more user friendly, a MultiClusterKafkaSourceBuilder will be provided (e.g. batch mode should not turn on KafkaMetadataService discovery, should only be done at startup).

Public Interfaces

The source will use the FLIP-27: Refactor Source Interface to integrate it with Flink and support both bounded and unbounded jobs.

This proposal does not include any changes to existing public interfaces of the KafkaSource. A new MultiClusterKafkaSource builder will serve as the public API and all other APIs will be marked as Internal in this proposal. 

The new source will go into the Kafka connector module and follow any connector repository changes of Kafka Source.

An example of building the new Source in unbounded mode

Code Block
languagejava
titleBuilder Example
MultiClusterKafkaSource.<String>builder()
  // some default implementations will be provided (file based, statically defined streams)
  .setKafkaMetadataService(new KafkaMetadataServiceImpl())
  .setStreamIds(List.of("my-stream-1", "my-stream-2"))
  .setGroupId("myConsumerGroup")
  .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
  .setStartingOffsets(OffsetsInitializer.earliest())
  .setProperties(properties)
  .build();

Proposed Changes

KafkaClusterIdentifier

...