You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 25 Next »

Status

Current stateUnder Discussion

Discussion thread

JIRAhere

Motivation

Currently client (e.g. producer, consumer) fetches metadata from the least loaded node. Because Kafka Controller sends UpdataMetadataRequest to brokers concurrently and there may be difference in when brokers process the UpdateMetadataRequest, it is possible that client fetches a metadata that is older than the existing metadata in its cache. This can cause OffsetOutOfRangeException in consumer even if there is no log truncation in the Kafka cluster (See KAFKA-6262 for more detail). For MirrorMaker whose offset reset policy is oldest, it can cause MM to rewind back to consume from the oldest offset. This increases the latency of transmitting the data from source to destination cluster and duplicates many data in the destination cluster.

In this KIP we propose to add leader_epoch and partition_epoch fields in the MetadataResponse so that client can refresh metadata if the incoming metadata is older than the existing metadata in its cache.

Public Interfaces

Zookeeper

1) Add znode /topic_epoch with the following json format

{
  "version" : int32,
  "topic_epoch" : int32
}

 

2) Update the znodes /brokers/topics/[topic] to use the following json format

{
  "version" : int32,
  "topic_epoch" : int32,  // This is newly added
  "partitions" : {
    partition -> [int32]   // partition has type int32. This maps partition to list of replicas
    ..
  }
}

Request/Response protocol

1) Add topic_epoch to UpdateMetadataRequest

UpdateMetadataRequest => controller_id controller_epoch partition_states live_brokers
  controller_id => int32
  controller_epoch => int32
  partition_states => [UpdateMetadataRequestPartitionState]
  live_brokers => [UpdateMetadataRequestBroker]
 
UpdateMetadataRequestPartitionState => topic partition controller_epoch leader leader_epoch topic_epoch isr zk_version replicas offline_replicas
  topic => string
  partition => int32
  controller_epoch => int32
  leader => int32
  leader_epoch => int32
  topic_epoch => int32        <-- NEW
  isr => [int32]
  zk_version => int32
  replicas => [int32]
  offline_replicas => [int32]

 

2) Add topic_epoch and leader_epoch to MetadataResponse

MetadataResponse => throttle_time_ms brokers cluster_id controller_id topic_metadata 
  throttle_time_ms => int32
  brokers => [MetadatBroker]
  cluster_id => nullable_str
  controller_id => int32
  topic_metadata => [TopicMetadata]
 
TopicMetadata => topic_error_code topic is_internal topic_epoch topic_epoch partition_metadata
  topic_error_code => int16
  topic => str
  is_internal => boolean
  topic_epoch => int32            <-- NEW
  partition_metadata => [PartitionMetadata]
 
PartitionMetadata => partition_error_code partition_id leader replicas leader_epoch isr offline_replicas
  partition_error_code => int16
  partition_id => int32
  leader => int32
  replicas => [int32]
  leader_epoch => int32           <-- NEW
  isr => [int32]
  offline_replicas => [int32]

 

3) Add topic_epoch and leader_epoch to OffsetCommitRequest

OffsetCommitRequest => group_id generation_id memeber_id retention_time topics
  group_id => str
  generation_id => int32
  member_id => str
  retention_time => int64
  topics => [OffsetCommitRequestTopic]
 
OffsetCommitRequestTopic => topic topic_epoch partitions
  topic => str
  topic_epoch => int32            <-- NEW
  partitions => [OffsetCommitRequestPartition]
 
OffsetCommitRequestPartition => partition offset leader_epoch metadata
  partition => int32
  offset => int64
  leader_epoch => int32           <-- NEW
  metadata => nullable_str

 

4) Add topic_epoch and leader_epoch to OffsetFetchResponse

OffsetFetchResponse => throttle_time_ms response error_code
  throttle_time_ms => int32
  responses => [OffsetFetchResponseTopic]
  error_code => int16
 
OffsetFetchResponseTopic => topic topic_epoch partition_responses
  topic => str
  topic_epoch => int32            <-- NEW
  partition_responses => [OffsetFetchResponsePartition]
 
OffsetFetchResponsePartition => partition offset leader_epoch metadata error_code
  partition => int32
  offset => int64
  leader_epoch => int32           <-- NEW
  metadata => nullable_str
  error_code => int16
 

Offset topic schema

Add topic_epoch and leader_epoch to the schema of the offset topic value.

OFFSET_COMMIT_VALUE_SCHEMA => offset topic_epoch leader_epoch metadata commit_timestamp expire_timestamp 
  offset => int64
  topic_epoch => int32            <-- NEW
  leader_epoch => int32           <-- NEW
  metadata => str
  commit_timestamp => int64
  expire_timestamp => int64

Consumer API

1) Add the following methods to the interface org.apache.kafka.clients.consumer.Consumer

void seek(TopicPartition partition, long offset, OffsetEpoch offsetEpoch);
 
OffsetAndMetadata positionWithOffsetEpoch(TopicPartition partition);

2) Add field offsetEpoch to the class org.apache.kafka.clients.consumer.OffsetAndMetadata and create new class OffsetEpoch

public class OffsetAndMetadata {
 
  // This field encodes leader_epoch and partition_epoch. User should not need to interpret this value.
  private final OffsetEpoch offsetEpoch;
  .... // Other existing variables and methods
 
}
 
// This class will encode leader_epoch and topic_epoch. It can be evolved to encode more information in the future.
public class OffsetEpoch {

  static OffsetEpoch decode(byte[]);

  public byte[] encode();

  public String toString();
 
  private int topicEpoch();
 
  private int leaderEpoch();
 
  private OffsetEpoch(int topicEpoch, int leaderEpoch);

}

 

We can evolve the format of this field in the future as needed. This should only be used by Kafka client implementation and user should not need to interpret this string.

Proposed Changes

1) Topic creation and partition expansion.

When broker creates topic, broker will increment the global topic_epoch in the znode /topic_epoch by 1 and write this partition epoch in the znode /brokers/topics/[topic] according to its new json format.

2) Topic deletion.

When a topic is deleted, the global topic_epoch in the znode /topic_epoch will be incremented by 1.

3) Metadata propagation from controller to brokers and from brokers to clients.

Controller should include the topic_epoch and leader_epoch for each partition in the UpdateMetadataRequest. Broker should also include topic_epoch and leader_epoch for each partition in the MetadataResponse to clients.


4) Client's metadata refresh

After client receives MetadataResponse from a broker, it compares with the MetadataResponse with the cached metadata to check whether the MetadataResponse is outdated. The MetadataResponse is outdated if, for all those partitions that the client is interested in, there exists a partition such that the following is true:

metadataResponse[partition].topic_epoch < cachedMetadata[partition].topic_epoch || 
 (metadataResponse[partition].topic_epoch == cachedMetadata[partition].topic_epoch && 
  metadataResponse[partition].leader_epoch < cachedMetadata[partition].leader_epoch)

The client will be forced to refresh metadata again with the existing backoff mechanism if the MetadataResponse is determined to be outdated.

Note that producer is interested in all partitions. Consumers can potentially be interested in only partitions that it has explicitly subscribed to. The purpose of checking only a subset of partitions is to avoid unnecessary metadata refresh when the metadata is only outdated for partitions not needed by client. In other words, it is for optimization and it is not needed for correctness.


5) Offset commit

When consumer commits offset, it includes leader_epoch and topic_epoch together with offset in the OffsetCommitRequest. The leader_epoch should be the largest leader_epoch of those already consumed messages whose offset < the commit offset. If no message has been consumed since consumer initialization, the leader_epoch from seek(...) or OffsetFetchResponse should be used. The topic_epoch should be the topic_epoch of the given topic in the latest metadata. The coordinator should extract these values from the OffsetCommitRequest and write them into the offset topic.

 

6) Consumer rebalance or initialization using the offset and epoch from the Kafka offset topic

After consumer receives OffsetFetchResponse, it remembers the leader_epoch and the topic_epoch for each partition it needs to consume. Then the consumer needs to repeatedly refresh metadata if a metadata is considered outdated, which is determined using the similar criteria as introduced in the above subsection "Client's metadata refresh". The only difference is that, instead of using the leader_epoch and topic_epoch from the cached metadata, the consumer uses the values from the OffsetFetchResponse.

For existing version of the offset topic, leader_epoch and topic_epoch will not be available in the value of the offset topic message. Both the leader_epoch and the topic_epoch are assumed to be -1 and the client will preserve the existing behavior without doing the additional metadata refresh.


7) Consumer initialization if offset is stored externally.

When committing offset, user should use the newly added API positionWithOffsetEpoch(...) to read the internal_metadata (which encodes leader_epoch and topic_epoch). This internal metadata encodes the leader_epoch and topic_epoch. The leader_epoch should be the largest leader_epoch of those already consumed messages whose offset < position. If no message has been consumed since consumer initialization, the leader_epoch from seek(...) or OffsetFetchResponse should be used. The topic_epoch should be the topic_epoch of the given topic in the latest metadata. 

When initializing consumer, user should read the externally stored internal_metadata together with the offset. Then user should call the newly added API seek(partition, offset, offsetEpoch) to seek to the previous offset. Later when consumer.poll() is called, the consumer needs to repeatedly refresh metadata if a metadata is considered outdated, which is determined using the similar criteria as introduced in the above subsection "Client's metadata refresh". The only difference is that, instead of using the leader_epoch and topic_epoch from the cached metadata, the consumer uses the values that extracted from the offsetEpoch provided by the seek(...).

If the offsetEpoch from seek(...) can not be decoded by Kafka client implementation, the IllegalArgumentException is thrown from seek(...).

If user calls existing API seek(partition, offset) and there is not committed offset/leader_epoch/topic_epoch from the Kafka offset topic, both the leader_epoch and the topic_epoch are assumed to be -1 and the client will preserve the existing behavior without doing the additional check.

Compatibility, Deprecation, and Migration Plan

The KIP changes the inter-broker protocol. Therefore the migration requires two rolling bounce. In the first rolling bounce we will deploy the new code but broker will still communicate using the existing protocol. In the second rolling bounce we will change the config so that broker will start to communicate with each other using the new protocol.

Rejected Alternatives

- Use a global per-metadata version.

This can be a bit more complicated by introducing a new state in Kafka. leader_epoch is an existing state we already maintain in zookeeper. By using per-partition leader_epoch the client will only be forced to re-fresh metadata if the MetadataResponse contains out-dated metadata for those partitions that the client is interested in.

Future work

1) We can use the topic_epoch to detect that the partition used in the seek() has been deleted. Then we can thrown proper exception to the user so that user can handle it properly.

2) We can use the leader_epoch to better handle the offset reset in case of unclean leader election.

 

 

 

  • No labels