Status
Current state: Under Discussion
Discussion thread:
JIRA:
Released:
Motivation
KIP-708 was implemented some time ago in Kafka. This allows brokers and consumers to communicate about the rack (or AWS Availability Zone) they're located in. Reading from a local broker can save money in bandwidth and improve latency for your consumers.
Flink Kafka consumers currently cannot easily use rack awareness if they're deployed across multiple racks or availability zones, because they have no control over which rack the Task Manager they'll be assigned to may be in.
This improvement proposes that a Kafka Consumer could be configured with a callback that could be run when it's being configured on the task manager, that will set the appropriate value at runtime if a value is provided.
Public Interfaces
KafkaSourceBuilder will get a new method setRackId() that will take a callback as an argument. Setting this value will result in the value of the client.rack property for the Kafka client on each Task Manager. Note that to see any behavior change or benefit from this setting, the Kafka brokers the client is connecting to must be configured to assign replicas with the RackAwareReplicaSelector and have their own broker rack IDs set correspondingly to the client rack IDs.
https://kafka.apache.org/documentation/#consumerconfigs_client.rack
Proposed Changes
The callback for determining rack ID at run time will need to be passed from the KafkaSourceBuilder down to the KafkaPartitionSplitReader. When the KafkaPartitionSplitReader is instantiated, the callback will be run and the result added to the config properties passed to the KafkaConsumer, similar to how Client ID is handled now.
Compatibility, Deprecation, and Migration Plan
There shouldn't be any impact on existing users. This is new, non-required functionality.
Test Plan
Testing is going to include verifying that the properties are include the expected result for client.rack, and also that an Exception thrown by the callback does the right thing.
Rejected Alternatives
This can't be provided through KafkaSourceBuilder.setProperties() because Task Managers may sit in different racks or data centers, and thus must be configured at runtime.