Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder Discussion

Discussion thread

JIRAhere

Motivation

Currently client (e.g. producer, consumer) fetches metadata from the least loaded node. Because Kafka Controller sends UpdataMetadataRequest to brokers concurrently and there may be difference in when brokers process the UpdateMetadataRequest, it is possible that client fetches a metadata that is older than the existing metadata in its cache. This can cause OffsetOutOfRangeException in consumer even if there is no log truncation in the Kafka cluster (See KAFKA-6262 for more detail). For MirrorMaker whose offset reset policy is oldest, it can cause MM to rewind back to consume from the oldest offset. This increases the latency of transmitting the data from source to destination cluster and duplicates many data in the destination cluster.

In this KIP we propose to add leader_epoch and partition_epoch fields in the MetadataResponse so that client can refresh metadata if the incoming metadata is older than the existing metadata in its cache.

Public Interfaces

Zookeeper

1) Add znode /partition_epoch with the following json format

...

2) Update the znodes /brokers/topics/[topic]/partitions/[partition] to use the following json format

Code Block
{
  "version" : int32,
  "partitionspartition_epoch" : {
    int32 -> int32   // partition has type int32. This maps each partition to its partition_epoch
    ..
  }
}

Request/Response protocol

1) Add partition_epoch to UpdateMetadataRequest

...

Code Block
ListOffsetResponse => throttle_time_ms responses
  throttle_time_ms => int32
  responses => [ListOffsetResponseTopic]
 
ListOffsetResponseTopic => topic partition_responses
  topic => str
  partition_responses => [ListOffsetResponsePartition]
 
ListOffsetResponsePartition => partition leader_epoch partition_epoch error_code timestamp offset
  partition => int32
  leader_epoch => int32           <-- NEW
  partition_epoch => int32        <-- NEW
  error_code => int16
  timestamp => int64
  offset => int64

Offset topic schema

Add partition_epoch and leader_epoch to the schema of the offset topic value.

Code Block
OFFSET_COMMIT_VALUE_SCHEMA => offset leader_epoch partition_epoch metadata commit_timestamp expire_timestamp 
  offset => int64
  leader_epoch => int32           <-- NEW
  partition_epoch => int32        <-- NEW
  metadata => str
  commit_timestamp => int64
  expire_timestamp => int64

Consumer API

1) Create new class OffsetEpoch

We can evolve the OffsetEpoch class as needed to encode more information. This should only be used by Kafka client implementation and user should not need to interpret its value.

OffsetAndOffsetEpoch

Code Block
Code Block
public class OffsetAndMetadataOffsetAndOffsetEpoch {
 
  // This field encodes leader_epoch and partition_epoch. User should not need to interpret this value.
  private final OffsetEpoch offsetEpoch;
 
  ...public OffsetAndOffsetEpoch(long offset, byte[] offsetEpoch);
 
  public byte[] offsetEpoch();
 
  public long offset();
 
}


2) Add the following methods to the interface org.apache.kafka.clients.consumer.Consumer

Code Block
void seek(TopicPartition partition, long offset, OffsetEpoch byte[] offsetEpoch);
 
OffsetAndMetadataOffsetAndOffsetEpoch positionAndOffsetEpoch(TopicPartition partition);

...

Code Block
public class OffsetAndMetadata {
  // This field encodes leader_epoch and partition_epoch. User should not need to interpret this value.
  private final OffsetEpochbyte[] offsetEpoch;
  ...
}
 
public class OffsetAndTimestamp {
  // This field encodes leader_epoch and partition_epoch. User should not need to interpret this value.
  private final OffsetEpochbyte[] offsetEpoch;
  ...
}

 

4) Add new Error INVALID_PARTITION_EPOCH

...

Code Block
public class InvalidPartitionEpochException {

 
  InvalidPartitionEpochException(Map<TopicPartition, Long> invalidPartitionEpochs)
 
}

Proposed Changes

1) Topic creation and partition expansion.

When broker creates topic or expand partition, broker should increment the global partition_epoch in the znode /partition_epoch by 1 for each new partition to be created. Thus each partition can be identified by a globally unique partition_epoch. The resulting partition -> partition_epoch mapping for this topic partition should be written in the znode /brokers/topics/[topic]/partitions/[partition].

2) Topic deletion.

When a topic is deleted, the global partition_epoch in the znode /partition_epoch will be incremented by 1. This can help us recognize the more recent metadata after the topic deletion.

...

After consumer receives FetchResponse from a broker, consumer should verify that the partition_epoch from the FetchResponse equals the partition_epoch associated with the last used offset. The partition_epoch associated with the last used offset can be obtained from seek(...) and OffsetFetchResponse. If there is no partition_epoch associated with the last used offset, this check is not performed. If there is partition_epoch associated with the last used offset and its value is different from the partition_epoch from the FetchResponse, consumer.poll() should throw InvalidPartitionEpochException (constructed with partition -> partition_epoch mapping) those partitions.

Compatibility, Deprecation, and Migration Plan

The KIP changes the inter-broker protocol. Therefore the migration requires two rolling bounce. In the first rolling bounce we will deploy the new code but broker will still communicate using the existing protocol. In the second rolling bounce we will change the config so that broker will start to communicate with each other using the new protocol.

Rejected Alternatives

- Use a global per-metadata version.

This can be a bit more complicated by introducing a new state in Kafka. leader_epoch is an existing state we already maintain in zookeeper. By using per-partition leader_epoch the client will only be forced to re-fresh metadata if the MetadataResponse contains out-dated metadata for those partitions that the client is interested in.

Future work

1) We can use the partition_epoch to detect that the partition used in the seek() has been deleted. Then we can thrown proper exception to the user so that user can handle it properly.2) We can use the leader_epoch to better handle the offset reset in case of unclean leader election.

...