Table of Contents |
---|
Status
Current state: Under Discussion
Discussion thread:
JIRA: here
Motivation
Currently client (e.g. producer, consumer) fetches metadata from the least loaded node. Because Kafka Controller sends UpdataMetadataRequest to brokers concurrently and there may be difference in when brokers process the UpdateMetadataRequest, it is possible that client fetches a metadata that is older than the existing metadata in its cache. This can cause OffsetOutOfRangeException in consumer even if there is no log truncation in the Kafka cluster (See KAFKA-6262 for more detail). For MirrorMaker whose offset reset policy is oldest, it can cause MM to rewind back to consume from the oldest offset. This increases the latency of transmitting the data from source to destination cluster and duplicates many data in the destination cluster.
In this KIP we propose to add version field in the MetadataResponse and UpdateMetadataRequest so that client can refresh metadata if the incoming metadata is older than the existing metadata in its cache.
Public Interfaces
Code Block |
---|
UpdateMetadataRequest => controller_id controller_epoch partition_states live_brokers controller_id => int32 controller_epoch => int32 controller_metadata_epoch => int32 <-- New. This is instantiated to 0 after a broker becomes controller and monotonically increase over time controller_metadata_version => partition_states => [UpdateMetadataRequestPartitionState] live_brokers => [UpdateMetadataRequestBroker] |
Code Block |
---|
MetadataResponse => throttle_time_ms brokers cluster_id controller_id topic_metadata throttle_time_ms => int32 controller_epoch => int32 <-- New. This is the same as the controller_epoch in UpdateMetadataRequest. controller_metadata_epoch => int32 <-- New. This is the same as the controller_metadata_epoch in UpdateMetadataRequest. brokers => [MetadatBroker] cluster_id => nullable_str controller_id => int32 topic_metadata => TopicMetadata |
Proposed Changes
After client receives MetadataResponse from a broker, it compares the controller_epoch and controller_metadata_epoch of the currently cached metadata with those of the incoming MetadataResponse. It the controller_epoch of the incoming MetadataResponse, or if the controller_epoch is the same but the controller_metadata_epoch of the incoming MetadataResponse is response, client will refresh metadata again with the existing retry procedure.
Compatibility, Deprecation, and Migration Plan
The KIP changes the inter-broker protocol. Therefore the migration requires two rolling bounce. In the first rolling bounce we will deploy the new code but broker will still communicate using the existing protocol. In the second rolling bounce we will change the config so that broker will start to communicate with each other using the new protocol.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.