You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »

Status

Current stateUnder Discussion

Discussion thread

JIRAhere

Motivation

Currently client (e.g. producer, consumer) fetches metadata from the least loaded node. Because Kafka Controller sends UpdataMetadataRequest to brokers concurrently and there may be difference in when brokers process the UpdateMetadataRequest, it is possible that client fetches a metadata that is older than the existing metadata in its cache. This can cause OffsetOutOfRangeException in consumer even if there is no log truncation in the Kafka cluster (See KAFKA-6262 for more detail). For MirrorMaker whose offset reset policy is oldest, it can cause MM to rewind back to consume from the oldest offset. This increases the latency of transmitting the data from source to destination cluster and duplicates many data in the destination cluster.

In this KIP we propose to add version field in the MetadataResponse and UpdateMetadataRequest so that client can refresh metadata if the incoming metadata is older than the existing metadata in its cache.

Public Interfaces

1) Add leader_epoch field to MetadataResponse

MetadataResponse => throttle_time_ms brokers cluster_id controller_id topic_metadata
  throttle_time_ms => int32
  brokers => [MetadatBroker]
  cluster_id => nullable_str
  controller_id => int32
  topic_metadata => [TopicMetadata]
 
TopicMetadata => topic_error_code topic is_internal partition_metadata
  topic_error_code => int16
  topic => str
  is_internal => boolean
  partition_metadata => [PartitionMetadata]
 
PartitionMetadata => partition_error_code partition_id leader replicas isr offline_replicas
  partition_error_code => int16
  partition_id => int32
  leader => int32
  replicas => [int32]
  isr => [int32]
  offline_replicas => [int32]
  leader_epoch => int32          <-- NEW

 

2) Add leader_epoch field to OffsetCommitRequest

OffsetCommitRequest => group_id generation_id memeber_id retention_time topics
  group_id => str
  generation_id => int32
  member_id => str
  retention_time => int64
  topics => [OffsetCommitRequestTopic]
 
OffsetCommitRequestTopic => topic partitions
  topic => str
  partitions => [OffsetCommitRequestPartition]
 
OffsetCommitRequestPartition => partition offset metadata
  partition => int32
  offset => int64
  leader_epoch => int32      <-- NEW
  metadata => nullable_str

 

3) Add leader_epoch field to OffsetFetchResponse

OffsetFetchResponse => throttle_time_ms response error_code
  throttle_time_ms => int32
  responses => [OffsetFetchResponseTopic]
  error_code => int16
 
OffsetFetchResponseTopic => topic partition_responses
  topic => str
  partition_responses => [OffsetFetchResponsePartition]
 
OffsetFetchResponsePartition => partition offset metadata error_code
  partition => int32
  offset => int64
  leader_epoch => int32            <-- NEW
  metadata => nullable_str
  error_code => int16
 

 

4) Add leader_epoch field to the schema of the offset topic value.

 

OFFSET_COMMIT_VALUE_SCHEMA => offset leader_epoch metadata commit_timestamp expire_timestamp 
  offset => int64
  leader_epoch => int32     <-- NEW
  metadata => str
  commit_timestamp => int64
  expire_timestamp => int64

 

5) Add error INVALID_LEADER_EPOCH. This will be a non-retriable error which may be thrown from consumer's API.

Proposed Changes

1) Metadata refresh

After client receives MetadataResponse from a broker, it compares with the MetadataResponse with the cached metadata to check whether the MetadataResponse is outdated. The MetadataResponse is outdated if any of the following conditions are true:

- Across all those partitions that the client is interested in, there exists a partition whose leader_epoch in the MetadataResposne < the leader_epoch in the cached metadata
- Across all those partitions that the client is interested in, there exists a partition A which is found in the cached metadata but not in the MetadataResponse. And there exists another partition B in MetadataResponse where B.topic == A.topic. This is needed for the partition expansion scenario.

The client will be forced to refresh metadata again with the existing backoff mechanism if the MetadataResponse is determined to be outdated.

Note that producer is interested in all partitions. Consumers can potentially be interested in only partitions that it has explicitly subscribed to. The purpose of checking only a subset of partitions is an optimization which aim to avoid unnecessary metadata refresh when the metadata is only outdated for partitions not needed by client.

2) Offset commit

When consumer commits offset, it looks up leader_epoch of the partition in the cached metadata and includes this value in the OffsetCommitRequest. The leader_epoch will included in the message appended to the offset topic.

When coordinator receives the OffsetCommitRequest, for each partition in the OffsetCommitRequest, it will additionally check whether the leader_epoch in the request >= leader epoch in the last commit. If not, the error for that OffsetCommitResponse will be INVALID_LEADER_EPOCH.

 

3) Offset fetch

After consumer receives OffsetFetchResponse, it remembers the leader_epoch for each partition it needs to consume. Then the consumer needs to refresh metadata until the leader_epoch in the cached metadata >= the leader_epoch in OffsetFetchResponse for all partitions it wants to consume. Note that these logic are all hidden from user and the leader_epoch will not be exposed to user via consumer's public API (e.g. OffsetAndMetadata).

For existing version of the offset topic, leader_epoch will not be available in the value of the offset topic message. We will use leader_epoch = -1 to indicate the missing leader_epoch. In this case leader_epoch in any MetadataResponse will be larger than the leader_epoch = -1 and the consumer behavior will be the same as it is now.

Compatibility, Deprecation, and Migration Plan

The KIP changes the inter-broker protocol. Therefore the migration requires two rolling bounce. In the first rolling bounce we will deploy the new code but broker will still communicate using the existing protocol. In the second rolling bounce we will change the config so that broker will start to communicate with each other using the new protocol.

Rejected Alternatives

- Use a global per-metadata version.

This can be a bit more complicated by introducing a new state in Kafka. leader_epoch is an existing state we already maintain in zookeeper. By using per-partition leader_epoch the client will only be forced to re-fresh metadata if the MetadataResponse contains out-dated metadata for those partitions that the client is interested in.

 

  • No labels