Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder Discussion

Discussion thread

JIRAhere

Motivation

Currently client (e.g. producer, consumer) fetches metadata from the least loaded node. Because Kafka Controller sends UpdataMetadataRequest to brokers concurrently and there may be difference in when brokers process the UpdateMetadataRequest, it is possible that client fetches a metadata that is older than the existing metadata in its cache. This can cause OffsetOutOfRangeException in consumer even if there is no log truncation in the Kafka cluster (See KAFKA-6262 for more detail). For MirrorMaker whose offset reset policy is oldest, it can cause MM to rewind back to consume from the oldest offset. This increases the latency of transmitting the data from source to destination cluster and duplicates many data in the destination cluster.

In this KIP we propose to add version field in the MetadataResponse and UpdateMetadataRequest so that client can refresh metadata if the incoming metadata is older than the existing metadata in its cache.

Public Interfaces

1) Add leader_epoch field to MetadataResponse

...

Code Block
OffsetCommitResponse => throttle_time_ms response error_code
  throttle_time_ms => int32
  responses => [OffsetFetchResponseTopic]
  error_code => int16
 
OffsetFetchResponseTopic => topic partition_responses
  topic => str
  partition_responses => [OffsetFetchResponsePartition]
 
OffsetFetchResponsePartition => partition offset metadata error_code
  partition => int32
  offset => int64
  leader_epoch => int32            <-- NEW
  metadata => nullable_str
  error_code => int16
 

 

Proposed Changes

1) Metadata refresh

After client receives MetadataResponse from a broker, it compares with the MetadataResponse with the cached metadata to check whether the MetadataResponse is outdated. The MetadataResponse is outdated if any of the following conditions are true:

...

After consumer receives OffsetFetchResponse, it needs to refresh metadata until the leader_epoch in the cached metadata >= the leader_epoch in OffsetFetchResponse for all partitions in the OffsetFetchResponse.

 

Compatibility, Deprecation, and Migration Plan

The KIP changes the inter-broker protocol. Therefore the migration requires two rolling bounce. In the first rolling bounce we will deploy the new code but broker will still communicate using the existing protocol. In the second rolling bounce we will change the config so that broker will start to communicate with each other using the new protocol.

Rejected Alternatives

- Use a global per-metadata version.

...