You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Dynamic partition discovery is a useful feature in Kafka source especially under the scenario when the consuming Kafka topic scales out, or the source subscribes to multiple Kafka topics with a pattern. Users don’t have to restart the Flink job to consume messages in the new partition with this feature enabled. Currently, dynamic partition discovery is disabled by default and users have to explicitly specify the interval of discovery in order to turn it on.

In this FLIP we propose to enable dynamic partition discovery by default in Kafka source, both in Table/SQL API and DataStream API. 

Public Interfaces

Kafka table source (Table / SQL API)

  • scan.topic-partition-discovery.interval” will be set to 5 minutes by default, aligned with the default value of "metadata.max.age.ms" in Kafka consumer.
  • As we need to provide a way for users to disable the feature, “scan.topic-partition-discovery.interval” = “0” will be used to turn off the discovery. Before this proposal, “0” means to enable partition discovery with interval = 0, which is a bit senseless in practice. Unfortunately we can't use negative values as the type of this option is Duration

Kafka source (DataStream API)

  • Dynamic partition discovery in Kafka source will be enabled by default, with discovery interval set to 5 minutes.
  • To align with table source, only a positive value for option “partition.discovery.interval.ms” could be used to specify the discovery interval. Both negative and zero will be interpreted as disabling the feature.

Proposed Changes

Initial offset of newly discovered partitions

Currently the offset of newly discovered partitions will be initialized by the OffsetsInitializer specified by user, which could lead to potential data loss (thinking a new partition is created and might be discovered by Kafka source several minutes later, and the message produced into the partition within the gap might be dropped if we use for example "latest" as the initial offset strategy).

As a consequence, we also propose to change the behavior to start consuming from the earliest offset for newly discovered partitions. 

Overhead of Partition Discovery in Kafka Source

Partition discovery is made on KafkaSourceEnumerator, which asynchronously fetches topic metadata from Kafka cluster and checks if there’s any new topic and partition. This shouldn’t introduce performance issues on the Flink side.

On the Kafka broker side, partition discovery makes MetadataRequest to Kafka broker for fetching topic infos. Considering Kafka broker has its metadata cache and the default request frequency is relatively low (per 30 seconds), this is not a heavy operation and the performance of the broker won’t be affected a lot. It'll also be great to get some inputs from Kafka experts. 

Compatibility, Deprecation, and Migration Plan

  • The semantic of “scan.topic-partition-discovery.interval” = 0 will be changed from enabling dynamic partition discovery with interval = 0 to disabling the feature. This could potentially affect SQL users. However, considering setting the option to 0 has no practical meaning, we could assume the range of impaction should be relatively small. 

Test Plan

This feature will be guarded by unit tests and integration tests in Kafka connector's source code. 

Rejected Alternatives

  • Initially the default discovery interval is set to 30 seconds. This interval is relatively too short comparing with the default value of its equivalent option "metadata.max.age.ms" in Kafka consumer. We adjust it to 5 minutes to align with the default of "metadata.max.age.ms". 


  • No labels