Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder Discussion

Discussion thread

JIRAhere

Motivation

Currently client (e.g. producer, consumer) fetches metadata from the least loaded node. Because Kafka Controller sends UpdataMetadataRequest to brokers concurrently and there may be difference in when brokers process the UpdateMetadataRequest, it is possible that client fetches a metadata that is older than the existing metadata in its cache. This can cause OffsetOutOfRangeException in consumer even if there is no log truncation in the Kafka cluster (See KAFKA-6262 for more detail). For MirrorMaker whose offset reset policy is oldest, it can cause MM to rewind back to consume from the oldest offset. This increases the latency of transmitting the data from source to destination cluster and duplicates many data in the destination cluster.

In this KIP we propose to add version field leader_epoch and partition_epoch fields in the MetadataResponse and UpdateMetadataRequest so that client can refresh metadata if the incoming metadata is older than the existing metadata in its cache.

Public Interfaces

Zookeeper

1) Add znode /partition_epoch with the following json format

...

2) Update the znodes /brokers/topics/[topic]/partitions/[partition] to use the following json format

Code Block
{
  "version" : int32,
  "partitionspartition_epoch" : {
    partition -> [int32]   // partition has type int32. This maps partition to list of replicas
    ..
  },
  "partition_epochs" : {
    partition -> int32    // This is newly added to map partition to partition_epoch
  }
}

...

int32
}

Request/Response protocol

1) Add partition_epoch to UpdateMetadataRequest

Code Block
UpdateMetadataRequest => controller_id controller_epoch max_partition_epoch partition_states live_brokers
  controller_id => int32
  controller_epoch => int32
  max_partition_epoch => int32    <-- NEW
  partition_states => [UpdateMetadataRequestPartitionState]
  live_brokers => [UpdateMetadataRequestBroker]
 
UpdateMetadataRequestPartitionState => topic partition controller_epoch leader leader_epoch partition_epoch isr zk_version replicas offline_replicas
  topic => string
  partition => int32
  controller_epoch

1) Add leader_epoch and partition_epoch to UpdateMetadataRequest

Code Block
UpdateMetadataRequest => controller_id controller_epoch partition_states live_brokers
  controller_id => int32
  controller_epochleader => int32
  partitionleader_statesepoch => [UpdateMetadataRequestPartitionState]int32
  livepartition_brokersepoch => [UpdateMetadataRequestBroker]
 
UpdateMetadataRequestPartitionState => topic partition controller_epoch leader leader_epoch partition_epoch isr zk_version replicas offline_replicas
  topic => string
  partitionint32        <-- NEW
  isr => [int32]
  zk_version => int32
  controller_epochreplicas => [int32]
  leaderoffline_replicas => int32
  leader_epoch [int32]

 

2) Add partition_epoch and leader_epoch to MetadataResponse

Code Block
MetadataResponse => throttle_time_ms max_partition_epoch brokers cluster_id controller_id topic_metadata 
  throttle_time_ms => int32
  max_partition_epoch => int32        <-- NEW
  isrbrokers => [int32MetadatBroker]
  zkcluster_versionid => int32nullable_str
  replicascontroller_id => [int32]
  offlinetopic_replicasmetadata => [int32TopicMetadata]

 

2) Add leader_epoch and partition_epoch to MetadataResponse

Code Block
MetadataResponse
 
TopicMetadata => throttletopic_timeerror_mscode brokerstopic clusteris_idinternal controllerpartition_id topic_metadata
  throttletopic_timeerror_mscode => int32int16
  brokers => [MetadatBroker]
  cluster_idtopic => nullable_str
  controlleris_idinternal => int32boolean
  topicpartition_metadata => [TopicMetadataPartitionMetadata]
  
TopicMetadataPartitionMetadata => topicpartition_error_code topic is_internalpartition_id leader replicas leader_epoch partition_metadata
epoch isr topic_offline_replicas
  partition_error_code => int16
  topicpartition_id => strint32
  is_internalleader => booleanint32
  partition_metadatareplicas => [PartitionMetadataint32]
 
PartitionMetadata leader_epoch => partition_error_code partition_id leader replicas leader_epoch partition_epoch isr offline_replicasint32           <-- NEW
  partition_error_codeepoch => int32 int16
  partition_id => int32
  leader => int32<-- NEW
  replicasisr => [int32]
  leaderoffline_epochreplicas => [int32]

 

3) Add partition_epoch and leader_epoch to OffsetCommitRequest

Code Block
OffsetCommitRequest => group_id generation_id memeber_id retention_time topics
  group_id   <-- NEW=> str
  partitiongeneration_epochid => int32
  member_id => str
  retention_time  <-- NEW=> int64
  isrtopics => [int32OffsetCommitRequestTopic]
 
OffsetCommitRequestTopic => topic partitions
  offline_replicastopic => [int32]

 

3) Add leader_epoch field and partition_epoch to OffsetCommitRequest

Code Block
OffsetCommitRequeststr
  partitions => [OffsetCommitRequestPartition]
 
OffsetCommitRequestPartition => group_id generation_id memeber_id retention_time topics
  group_idpartition offset leader_epoch partition_epoch metadata
  partition => strint32
  generation_id offset => int32int64
  memberleader_idepoch => int32 str
  retention_time => int64
  topics => [OffsetCommitRequestTopic]
 
OffsetCommitRequestTopic => topic<-- partitionsNEW
  topicpartition_epoch => str
int32  partitions => [OffsetCommitRequestPartition]
 
OffsetCommitRequestPartition => partition offset leader_epoch partition_epoch metadata
  partition => int32
  offset => int64
  leader_epoch => int32           <-- NEW
  partition_epoch => int32        <-- NEW
  metadata => nullable_str

 

4) Add leader_epoch and partition_epoch to OffsetFetchResponse

 <-- NEW
  metadata => nullable_str

 

4) Add partition_epoch and leader_epoch to OffsetFetchResponse

Code Block
OffsetFetchResponse => throttle_time_ms response error_code
  throttle_time_ms => int32
  responses => [OffsetFetchResponseTopic]
  error_code => int16
 
OffsetFetchResponseTopic => topic partition_responses
  topic => str
  partition_responses => [OffsetFetchResponsePartition]
 
OffsetFetchResponsePartition => partition offset leader_epoch partition_epoch metadata error_code
  partition => int32
  offset => int64
  leader_epoch => int32           <-- NEW
  partition_epoch => int32        <-- NEW
  metadata => nullable_str
  error_code => int16

 

5) Add partition_epoch to LeaderAndIsrRequest

Code Block
LeaderAndIsrRequest => controller_id controller_epoch partition_states live_leaders
  controller_id => int32
  controller
Code Block
OffsetFetchResponse => throttle_time_ms response error_code
  throttle_time_ms => int32
  responses => [OffsetFetchResponseTopic]
  error_code => int16
 
OffsetFetchResponseTopic => topic partition_responses
  topic => str
  partition_responses => [OffsetFetchResponsePartition]
 
OffsetFetchResponsePartition => partition offset leader_epoch partition_epoch metadata error_code
  partition => int32
  offset => int64
  leader_epoch => int32           <-- NEW
  partition_epoch => int32
  partition_states => [LeaderAndIsrRequestPartitionState]
  live_leaders  <-- NEW
  metadata=> [LeaderAndIsrRequestLiveLeader]
 
LeaderAndIsrRequestPartitionState => nullable_str
topic partition errorcontroller_codeepoch => int16
 

Offset topic schema

...

leader leader_epoch

...

 partition_epoch

...

Code Block
OFFSET_COMMIT_VALUE_SCHEMA isr zk_version replicas
  topic => offsetstr
 leader_epoch partition_epoch metadata commit_timestamp expire_timestamp  => int32
  controller_epoch => int32
  offsetleader => int64int32
  leader_epoch => int32
  partition_epoch => int32        <-- NEW
  partition_epochisr => [int32]
  zk_version => int32
  replicas  <-- NEW=> [int32]
  metadatais_new_replica => str
  commit_timestampboolean

 

6) Add partition_epoch to FetchResponse

Code Block
FetchResponse => int64throttle_time_ms responses
  expirethrottle_time_timestampms => int64

Consumer API

1) Add error INVALID_LEADER_EPOCH. This will be a non-retriable error which may be thrown from consumer's API.

2) Add error INVALID_PARTITION_EPOCH. This will be a non-retriable error which may be thrown from consumer's API.

3) Add the following method and class to the interface org.apache.kafka.clients.consumer.Consumer

Code Block
void seek(TopicPartition partition, long offset, int leaderEpoch, int partitionEpoch);
 
OffsetAndEpochs positionWithEpochs(TopicPartition partition);
 
public class OffsetAndEpochs {
  long offset;
  int leaderEpoch;
  int partitionEpoch;
}

4) Add two fields to the class org.apache.kafka.clients.consumer.OffsetAndMetadata

Code Block
public class OffsetAndMetadata {
 
  private final int leaderEpoch = -1;
  private final int partitionEpoch = -1;
 
  .... // existing variables and methods
 
}

 

Proposed Changes

1) Topic creation and partition expansion.

When broker creates partitions as part of topic creation or partition expansion, broker will increment the global partition_epoch in the znode /partition_epoch by 1 and write this partition epoch in the znode /brokers/topics/[topic] according to its new json format.

...

Controller should include the partition_epoch and leader_epoch for each partition in the UpdateMetadataRequest. Broker should also include partition_epoch and leader_epoch for each partition in the MetadataResponse to clients.

...

int32
  responses => [FetchResponseTopic]
 
FetchResponseTopic => topic partition_responses
  topic => str
  partition_responses => [FetchResponsePartition]
 
FetchResponsePartition => partition_header record_set
  partition_header => FetchResponsePartitionHeader
  record_set => Records
 
FetchResponsePartitionHeader => partition partition_epoch error_code high_watermark last_stable_offset log_start_offset aborted_transactions
  partition => int32
  partition_epoch => int32        <-- NEW
  error_code => int16
  high_watermark => int64
  last_stable_offset => int64 
  log_start_offset => in64
  aborted_transactions => [FetchResponseAbortedTransaction]


7) Add leader_epoch and partition_epoch to ListOffsetResponse

Code Block
ListOffsetResponse => throttle_time_ms responses
  throttle_time_ms => int32
  responses => [ListOffsetResponseTopic]
 
ListOffsetResponseTopic => topic partition_responses
  topic => str
  partition_responses => [ListOffsetResponsePartition]
 
ListOffsetResponsePartition => partition leader_epoch partition_epoch error_code timestamp offset
  partition => int32
  leader_epoch => int32           <-- NEW
  partition_epoch => int32        <-- NEW
  error_code => int16
  timestamp => int64
  offset => int64

Offset topic schema

Add partition_epoch and leader_epoch to the schema of the offset topic value.

Code Block
OFFSET_COMMIT_VALUE_SCHEMA => offset leader_epoch partition_epoch metadata commit_timestamp expire_timestamp 
  offset => int64
  leader_epoch => int32           <-- NEW
  partition_epoch => int32        <-- NEW
  metadata => str
  commit_timestamp => int64
  expire_timestamp => int64

Consumer API

1) Create new class OffsetAndOffsetEpoch

Code Block
public class OffsetAndOffsetEpoch {
 
  public OffsetAndOffsetEpoch(long offset, byte[] offsetEpoch);
 
  public byte[] offsetEpoch();
 
  public long offset();
 
}


2) Add the following methods to the interface org.apache.kafka.clients.consumer.Consumer

Code Block
void seek(TopicPartition partition, long offset, byte[] offsetEpoch);

OffsetAndOffsetEpoch positionAndOffsetEpoch(TopicPartition partition);


3) Add field offsetEpoch to the class org.apache.kafka.clients.consumer.OffsetAndMetadata and the class org.apache.kafka.clients.consumer.OffsetAndTimestamp

Code Block
public class OffsetAndMetadata {
  // This field encodes leader_epoch and partition_epoch. User should not need to interpret this value.
  private final byte[] offsetEpoch;
  ...
}
 
public class OffsetAndTimestamp {
  // This field encodes leader_epoch and partition_epoch. User should not need to interpret this value.
  private final byte[] offsetEpoch;
  ...
}

 

4) Add new Error INVALID_PARTITION_EPOCH

consumer.poll() will throw InvalidPartitionEpochException if partition_epoch of the given partition is different from the corresponding partition_epoch from the FetchResponse. This can happen if user consumes data with previously used offset after the topic is deleted.

Code Block
public class InvalidPartitionEpochException {
 
  InvalidPartitionEpochException(Map<TopicPartition, Long> invalidPartitionEpochs)
 
}

Proposed Changes

1) Topic creation and partition expansion.

When broker creates topic or expand partition, broker should increment the global partition_epoch in the znode /partition_epoch by 1 for each new partition to be created. Thus each partition can be identified by a globally unique partition_epoch. The resulting partition -> partition_epoch mapping for this partition should be written in the znode /brokers/topics/[topic]/partitions/[partition].

2) Topic deletion.

When a topic is deleted, the global partition_epoch in the znode /partition_epoch will be incremented by 1. This can help us recognize the more recent metadata after the topic deletion.

3) Metadata propagation from controller to brokers and from brokers to clients.

Controller should include the current maximum partition_epoch in the UpdateMetadataRequest. Controller should also include partition_epoch and leader_epoch for each partition in the UpdateMetadataRequest. Broker should similarly include max_partition_epoch, partition_epoch and leader_epoch in the MetadataResponse to clients.

4) Client's metadata refresh

After client receives MetadataResponse from a broker, it compares with the MetadataResponse with the cached metadata to check whether the MetadataResponse is outdated. The MetadataResponse is outdated if, for all those partitions that the client is interested in, there exists a partition such that the following is true:

Code Block
metadata.max_partition_epoch < cachedMetadata.max_partition_epoch ||
metadataResponse[partition].partition_epoch < cachedMetadata[partition].partition_epoch || 
 (metadataResponse[partition].partition_epoch == cachedMetadata[partition].partition_epoch && 
  metadataResponse[partition].leader_epoch < cachedMetadata[partition].leader_epoch)

The client should refresh metadata again with the existing backoff mechanism if the MetadataResponse is determined to be outdated.

Note that producer is interested in all partitions. Consumers can potentially be interested in only partitions that it has explicitly subscribed to. The purpose of checking only a subset of partitions is to avoid unnecessary metadata refresh when the metadata is only outdated for partitions not needed by client. In other words, it is for optimization and it is not needed for correctness.


5) Offset commit

When consumer commits offset, it includes leader_epoch and partition_epoch together with offset in the OffsetCommitRequest. The leader_epoch should be the largest leader_epoch of messages whose offset < the commit offset. If no message has been consumed since consumer initialization, the leader_epoch from seek(...) or OffsetFetchResponse should be used. The partition_epoch should be read from the last FetchResponse corresponding to the given partition and commit offset. The coordinator should extract these values from the OffsetCommitRequest and write them into the offset topic.

6) Consumer rebalance or initialization using the offset and epoch from the Kafka offset topic

After consumer receives OffsetFetchResponse, it remembers the leader_epoch and the partition_epoch for each partition it needs to consume. Then the consumer needs to repeatedly refresh metadata if a metadata is considered outdated, which is determined using the similar criteria as introduced in the above subsection "Client's metadata refresh". The only difference is that, instead of using the leader_epoch and partition_epoch from the cached metadata, the consumer uses the values from the OffsetFetchResponse.

For existing version of the offset topic, leader_epoch and partition_epoch will not be available in the value of the offset topic message. Both the leader_epoch and the partition_epoch are assumed to be -1 and the client will preserve the existing behavior without doing the additional metadata refresh.

7) Consumer initialization if offset is stored externally.

When committing offset, user should use the newly added API positionAndOffsetEpoch(...) to read the offset and offsetEpoch (which encodes leader_epoch and partition_epoch). The leader_epoch should be the largest leader_epoch of those messages whose offset < position. If no message has been consumed since consumer initialization, the leader_epoch from seek(...) or OffsetFetchResponse should be used. The partition_epoch should be read from the last FetchResponse corresponding to the given partition and position offset. Both offset and the offsetEpoch (in the form of byte array) should be written to the external store.

When initializing consumer, user should read the externally stored offsetEpoch (as byte array) together with the offset. Then user should call the newly added API seek(partition, offset, offsetEpoch) to seek to the previous offset. Later when consumer.poll() is called,

After client receives MetadataResponse from a broker, it compares with the MetadataResponse with the cached metadata to check whether the MetadataResponse is outdated. The MetadataResponse is outdated if, for all those partitions that the client is interested in, there exists a partition such that the following is true:

Code Block
metadataResponse[partition].partition_epoch < cachedMetadata[partition].partition_epoch || 
 (metadataResponse[partition].partition_epoch == cachedMetadata[partition].partition_epoch && 
  metadataResponse[partition].leader_epoch < cachedMetadata[partition].leader_epoch)

The client will be forced to refresh metadata again with the existing backoff mechanism if the MetadataResponse is determined to be outdated.

Note that producer is interested in all partitions. Consumers can potentially be interested in only partitions that it has explicitly subscribed to. The purpose of checking only a subset of partitions is to avoid unnecessary metadata refresh when the metadata is only outdated for partitions not needed by client. In other words, it is for optimization and it is not needed for correctness.

...

When consumer commits offset, it looks up leader_epoch and partition_epoch in the cached metadata and includes this value in the OffsetCommitRequest. The leader_epoch and partition_epoch will then be written in the message to the offset topic.

When coordinator receives the OffsetCommitRequest, for each partition in the OffsetCommitRequest, it will additionally check whether the leader_epoch or partition_epoch in the request. If the partition_epoch from the OffsetCommitRequest is smaller than the last known value, the error for this partition in the OffsetCommitResponse is INVALID_PARTITION_EPOCH. Otherwise, if the partition_epoch is the same but the leader_epoch from the OffsetCommitRequest is smaller than the last known value, the error for this partition in the OffsetCommitResponse is INVALID_LEADER_EPOCH.

If the client/broker library is old, both the leader_epoch and the partition_epoch are assumed to be -1 and broker will preserve the existing behavior without doing this additional check.

5) Consumer rebalance or initialization using the offset and epoch from the Kafka offset topicAfter consumer receives OffsetFetchResponse, it remembers the leader_epoch and the partition_epoch for each partition it needs to consume. Then the consumer needs to repeatedly refresh metadata if a metadata is considered outdated, which is determined using the similar criteria as introduced in the above subsection "Client's metadata refresh". The only difference is that, instead of using the leader_epoch and partition_epoch from the cached metadata, the consumer uses the values from the OffsetFetchResponse.

For existing version of the offset topic, leader_epoch and partition_epoch will not be available in the value of the offset topic message. Both the leader_epoch and the partition_epoch are assumed to be -1 and the client will preserve the existing behavior without doing the additional metadata refresh.

...

that extracted from the offsetEpoch provided to the seek(...).

If the offsetEpoch from seek(...) can not be decoded by Kafka client implementation, the IllegalArgumentException is thrown from seek(...).

If user calls the existing API seek(partition, offset) and there is not committed offset/leader_epoch/partition_epoch from the Kafka offset topic, both the leader_epoch and the partition_epoch are assumed to be -1 and the client will preserve the existing behavior without doing the additional check.

8) Consumption after topic deletion

consumer.poll() will throw this exception if partition_epoch of the given partition is different from the corresponding partition_epoch from the FetchResponse. This can happen if user consumes data with previously used offset after the topic is deleted.

After consumer receives FetchResponse from a broker, consumer should verify that the partition_epoch from the FetchResponse equals the partition_epoch associated with the last used offset. The partition_epoch associated with the last used offset can be obtained from seek(...) and OffsetFetchResponse. If there is no partition_epoch associated with the last used offset, this check is not performed. If there is partition_epoch associated with the last used offset and its value is different from the partition_epoch from the FetchResponse, consumer.poll() should throw InvalidPartitionEpochException (constructed with partition -> partition_epoch mapping) those partitions

When committing offset, user should use the newly added API positionWithEpochs(...) to read the leader_epoch and partition_epoch as well and store them together with the offset.

When initializing consumer, user should read the externally stored leader_epoch and partition_epoch together with the offset. Then user should call the newly added API seek(partition, offset, leaderEpoch, partitionEpoch). Later when consumer.poll() is called, the consumer needs to repeatedly refresh metadata if a metadata is considered outdated, which is determined using the similar criteria as introduced in the above subsection "Client's metadata refresh". The only difference is that, instead of using the leader_epoch and partition_epoch from the cached metadata, the consumer uses the values that were externally stored.

If user calls existing API seek(partition, offset) and there is not committed offset/leader_epoch/partition_epoch from the Kafka offset topic, both the leader_epoch and the partition_epoch are assumed to be -1 and the client will preserve the existing behavior without doing the additional check.

Compatibility, Deprecation, and Migration Plan

The KIP changes the inter-broker protocol. Therefore the migration requires two rolling bounce. In the first rolling bounce we will deploy the new code but broker will still communicate using the existing protocol. In the second rolling bounce we will change the config so that broker will start to communicate with each other using the new protocol.

Rejected Alternatives

- Use a global per-metadata version.

This can be a bit more complicated by introducing a new state in Kafka. leader_epoch is an existing state we already maintain in zookeeper. By using per-partition leader_epoch the client will only be forced to re-fresh metadata if the MetadataResponse contains out-dated metadata for those partitions that the client is interested in.

Future work

1) We can use the partition_epoch to detect that the partition used in the seek() has been deleted. Then we can thrown proper exception to the user so that user can handle it properly.2) We can use the leader_epoch to better handle the offset reset in case of unclean leader election.

...