Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In this KIP we propose to add version field in the MetadataResponse and UpdateMetadataRequest so that client can refresh metadata if the incoming metadata is older than the existing metadata in its cache.

Public Interfaces

 1) Add leader_epoch field to MetadataResponse

Code Block
UpdateMetadataRequestMetadataResponse => controller throttle_time_ms brokers cluster_id controller_epochid partitiontopic_states live_brokers
  metadata
  throttle_time_ms => int32
  brokers => [MetadatBroker]
  cluster_id => nullable_str
  controller_id => int32
  controllertopic_epochmetadata => int32[TopicMetadata]
 
TopicMetadata  controller_metadata_epoch=> topic_error_code topic is_internal partition_metadata
  topic_error_code => int32int16
  topic <-- New. This is instantiated to 0 after a broker becomes controller and monotonically increase over time
  controller_metadata_version => 
  partition_states=> str
  is_internal => boolean
  partition_metadata => [PartitionMetadata]
 
PartitionMetadata => partition_error_code partition_id leader replicas isr offline_replicas
  partition_error_code => int16
  partition_id => int32
  leader => int32
  replicas => [int32]
  isr => [UpdateMetadataRequestPartitionStateint32]
  liveoffline_brokersreplicas => [UpdateMetadataRequestBroker]int32]
  leader_epoch => int32          <-- NEW

 

2) Add leader_epoch field to OffsetCommitRequest

Code Block
OffsetCommitRequest
Code Block
MetadataResponse => throttlegroup_time_ms brokers clusterid generation_id controllermemeber_id topicretention_metadatatime topics
  throttle_time_msgroup_id => str
  generation_id => int32
  controller_epochmember_id => str
  retention_time => int64
  int32topics =>  <-- New. This is the same as the controller_epoch in UpdateMetadataRequest.
  controller_metadata[OffsetCommitRequestTopic]
 
OffsetCommitRequestTopic => topic partitions
  topic => str
  partitions => [OffsetCommitRequestPartition]
 
OffsetCommitRequestPartition => partition offset metadata
  partition => int32
  offset => int64
  leader_epoch => int32      <-- New. This is the same as the controller_metadata_epoch NEW
  metadata => nullable_str

 

3) Add leader_epoch field to OffsetCommitResponse

Code Block
OffsetCommitResponse => throttle_time_ms response error_code
  throttle_time_ms => int32
  responses in UpdateMetadataRequest.
  brokers => [MetadatBrokerOffsetFetchResponseTopic]
  cluster_iderror_code => int16
 
OffsetFetchResponseTopic => nullable_str
  controller_idtopic partition_responses
  topic => str
  partition_responses => [OffsetFetchResponsePartition]
 
OffsetFetchResponsePartition => partition offset metadata error_code
  partition => int32
  offset => int64
  topic_leader_epoch => int32            <-- NEW
  metadata => TopicMetadatanullable_str
  error_code => int16
 

 

Proposed Changes

Controller maintains a controllerMetadataEpoch field in memory. This field is reset to 0 after the broker becomes controller. Then it is incremented by 1 every time the controller broadcasts new UpdateMetadataRequest. Together with the existing controllerEpoch field, this allows client to determine whether a new MetadataResponse is newer or older.

1) Metadata refresh

After client receives MetadataResponse from a broker, it compares with the MetadataResponse with the cached metadata to check whether the MetadataResponse is outdated. The MetadataResponse is outdated if any of the following conditions are true:

- Across all those partitions that the client is interested in, there exists a partition whose leader_epoch in the MetadataResposne < the leader_epoch in the cached metadata
- Across all those partitions that the client is interested in, there exists a partition A which is found in the cached metadata but not in the MetadataResponse. And there exists another partition B in MetadataResponse where B.topic == A.topic.

The client will be forced to refresh metadata again with the existing backoff mechanism if the MetadataResponse is determined to be outdated.

Note that producer is interested in all partitions. Consumers can potentially be interested in only partitions that it has explicitly subscribed to. The purpose of checking only a subset of partitions is an optimization which aim to avoid unnecessary metadata refresh when the metadata is only outdated for partitions not needed by client.

2) Offset commit and fetch

When consumer commits offset, it looks up leader_epoch of the partition in the cached metadata and includes this value in the OffsetCommitRequest.

After consumer receives OffsetFetchResponse, it needs to refresh metadata until the leader_epoch in the cached metadata >= the leader_epoch in OffsetFetchResponse for all partitions in the OffsetFetchResponse.

 After client receives MetadataResponse from a broker, it compares the controller_epoch and controller_metadata_epoch of the currently cached metadata with those of the incoming MetadataResponse. If (newMetadataResponse.controller_epoch > existingMetadata.controller_epoch || ( newMetadataResponse.controller_epoch == existingMetadata.controller_epoch && newMetadataResponse.controller_metadata_epoch >= existingMetadata.controller_metadata_epoch)), then the client accepts this newMetadataResponse. Otherwise, the client rejects this newMetadataResponse and refreshes metadata again with the existing retry procedure.

Compatibility, Deprecation, and Migration Plan

The KIP changes the inter-broker protocol. Therefore the migration requires two rolling bounce. In the first rolling bounce we will deploy the new code but broker will still communicate using the existing protocol. In the second rolling bounce we will change the config so that broker will start to communicate with each other using the new protocol.

Rejected Alternatives

- Use a global per-metadata version.

This can be a bit more complicated by introducing a new state in Kafka. leader_epoch is an existing state we already maintain in zookeeper. By using per-partition leader_epoch the client will only be forced to re-fresh metadata if the MetadataResponse contains out-dated metadata for those partitions that the client is interested in.