You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

Current stateUnder Discussion

Discussion thread

JIRAhere

Motivation

Currently client (e.g. producer, consumer) fetches metadata from the least loaded node. Because Kafka Controller sends UpdataMetadataRequest to brokers concurrently and there may be difference in when brokers process the UpdateMetadataRequest, it is possible that client fetches a metadata that is older than the existing metadata in its cache. This can cause OffsetOutOfRangeException in consumer even if there is no log truncation in the Kafka cluster (See KAFKA-6262 for more detail). For MirrorMaker whose offset reset policy is oldest, it can cause MM to rewind back to consume from the oldest offset. This increases the latency of transmitting the data from source to destination cluster and duplicates many data in the destination cluster.

In this KIP we propose to add version field in the MetadataResponse and UpdateMetadataRequest so that client can refresh metadata if the incoming metadata is older than the existing metadata in its cache.

Public Interfaces

 

UpdateMetadataRequest => controller_id controller_epoch partition_states live_brokers
  controller_id => int32
  controller_epoch => int32
  controller_metadata_epoch => int32   <-- New. This is instantiated to 0 after a broker becomes controller and monotonically increase over time
  controller_metadata_version => 
  partition_states => [UpdateMetadataRequestPartitionState]
  live_brokers => [UpdateMetadataRequestBroker]
MetadataResponse => throttle_time_ms brokers cluster_id controller_id topic_metadata
  throttle_time_ms => int32
  controller_epoch => int32   <-- New. This is the same as the controller_epoch in UpdateMetadataRequest.
  controller_metadata_epoch => int32   <-- New. This is the same as the controller_metadata_epoch in UpdateMetadataRequest.
  brokers => [MetadatBroker]
  cluster_id => nullable_str
  controller_id => int32
  topic_metadata => TopicMetadata

Proposed Changes

After client receives MetadataResponse from a broker, it compares the controller_epoch and controller_metadata_epoch of the currently cached metadata with those of the incoming MetadataResponse. It the controller_epoch of the incoming MetadataResponse, or if the controller_epoch is the same but the controller_metadata_epoch of the incoming MetadataResponse is response, client will refresh metadata again with the existing retry procedure.

Compatibility, Deprecation, and Migration Plan

The KIP changes the inter-broker protocol. Therefore the migration requires two rolling bounce. In the first rolling bounce we will deploy the new code but broker will still communicate using the existing protocol. In the second rolling bounce we will change the config so that broker will start to communicate with each other using the new protocol.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels