...
In this KIP we propose to add version field in the MetadataResponse and UpdateMetadataRequest so that client can refresh metadata if the incoming metadata is older than the existing metadata in its cache.
Public Interfaces
Zookeeper
1) Add znode /partition_epoch with the following json format
Code Block |
---|
{
"version" : int32,
"partition_epoch" : int32
} |
2) Update the znodes /brokers/topics/[topic] to use the following json format
Code Block |
---|
{
"version" : int32,
"partitions" : {
partition -> [int32] // partition has type int32. This maps partition to list of replicas
..
},
"partition_epochs" : {
partition -> int32 // This is newly added to map partition to partition_epoch
}
} |
Request/Response protocol
1) Add leader_epoch and partition_epoch to UpdateMetadataRequest
Code Block |
---|
UpdateMetadataRequest => controller_id controller_epoch partition_states live_brokers
controller_id => int32
controller_epoch => int32
partition_states => [UpdateMetadataRequestPartitionState]
live_brokers => [UpdateMetadataRequestBroker]
UpdateMetadataRequestPartitionState => topic partition controller_epoch leader leader_epoch partition_epoch isr zk_version replicas offline_replicas
topic => string
partition => int32
controller_epoch => int32
leader => int32
leader_epoch => int32
partition_epoch => int32 <-- NEW
isr => [int32]
zk_version => int32
replicas => [int32]
offline_replicas => [int32] |
2) Add leader_epoch and partition_epoch ) Add leader_epoch field to MetadataResponse
Code Block |
---|
MetadataResponse => throttle_time_ms brokers cluster_id controller_id topic_metadata throttle_time_ms => int32 brokers => [MetadatBroker] cluster_id => nullable_str controller_id => int32 topic_metadata => [TopicMetadata] TopicMetadata => topic_error_code topic is_internal partition_metadata topic_error_code => int16 topic => str is_internal => boolean partition_metadata => [PartitionMetadata] PartitionMetadata => partition_error_code partition_id leader replicas leader_epoch partition_epoch isr offline_replicas partition_error_code => int16 partition_id => int32 leader => int32 replicas => [int32] isrleader_epoch => [int32] offline_replicas => [int32]int32 <-- NEW leaderpartition_epoch => int32 <-- NEW isr => [int32] offline_replicas => [int32] |
23) Add leader_epoch field and partition_epoch to OffsetCommitRequest
Code Block |
---|
OffsetCommitRequest => group_id generation_id memeber_id retention_time topics group_id => str generation_id => int32 member_id => str retention_time => int64 topics => [OffsetCommitRequestTopic] OffsetCommitRequestTopic => topic partitions topic => str partitions => [OffsetCommitRequestPartition] OffsetCommitRequestPartition => partition offset leader_epoch partition_epoch metadata partition => int32 offset => int64 leader_epoch => int32 <-- NEW partition_epoch => int32 <-- NEW metadata => nullable_str |
34) Add leader_epoch field to and partition_epoch to OffsetFetchResponse
Code Block |
---|
OffsetFetchResponse => throttle_time_ms response error_code throttle_time_ms => int32 responses => [OffsetFetchResponseTopic] error_code => int16 OffsetFetchResponseTopic => topic partition_responses topic => str partition_responses => [OffsetFetchResponsePartition] OffsetFetchResponsePartition => partition offset leader_epoch partition_epoch metadata error_code partition => int32 offset => int64 leader_epoch => int32 <-- NEW partition_epoch => int32 <-- NEW metadata => nullable_str error_code => int16 |
...
Offset topic schema
4) Add leader_epoch field and partition_epoch to the schema of the offset topic value.
Code Block |
---|
OFFSET_COMMIT_VALUE_SCHEMA => offset leader_epoch partition_epoch metadata commit_timestamp expire_timestamp offset => int64 leader_epoch => int32 <-- NEW partition_epoch => int32 <-- NEW metadata => str commit_timestamp => int64 expire_timestamp => int64 |
...
Consumer API
51) Add error INVALID_LEADER_EPOCH. This will be a non-retriable error which may be thrown from consumer's API.
2) Add error INVALID_PARTITION_EPOCH. This will be a non-retriable error which may be thrown from consumer's API.
3) Add the following method and class to the interface org.apache.kafka.clients.consumer.Consumer
Code Block |
---|
void seek(TopicPartition partition, long offset, int leaderEpoch, int partitionEpoch);
OffsetAndEpochs positionWithEpochs(TopicPartition partition);
public class OffsetAndEpochs {
long offset;
int leaderEpoch;
int partitionEpoch;
} |
4) Add two fields to the class org.apache.kafka.clients.consumer.OffsetAndMetadata
Code Block |
---|
public class OffsetAndMetadata {
private final int leaderEpoch = -1;
private final int partitionEpoch = -1;
.... // existing variables and methods
} |
Proposed Changes
1) Topic creation and partition expansion.
When broker creates partitions as part of topic creation or partition expansion, broker will increment the global partition_epoch in the znode /partition_epoch
by 1 and write this partition epoch in the znode /brokers/topics/[topic]
according to its new json format.
2) Metadata propagation from controller to brokers and from brokers to clients.
Controller should include the partition_epoch and leader_epoch for each partition in the UpdateMetadataRequest. Broker should also include partition_epoch and leader_epoch for each partition in the MetadataResponse to clients.
3) Client's metadata refresh
After client receives MetadataResponse from a broker, it compares with the MetadataResponse with the cached metadata to check whether the MetadataResponse is outdated. The MetadataResponse is outdated if any of the following conditions are true:- Across , for all those partitions that the client is interested in, there exists a partition whose leader_epoch in the MetadataResposne < the leader_epoch in the cached metadata
- Across all those partitions that the client is interested in, there exists a partition A which is found in the cached metadata but not in the MetadataResponse. And there exists another partition B in MetadataResponse where B.topic == A.topic. This is needed for the partition expansion scenario.partition such that the following is true:
Code Block |
---|
metadataResponse[partition].partition_epoch < cachedMetadata[partition].partition_epoch ||
(metadataResponse[partition].partition_epoch == cachedMetadata[partition].partition_epoch &&
metadataResponse[partition].leader_epoch < cachedMetadata[partition].leader_epoch) |
The client will be forced to refresh metadata again with the existing backoff mechanism if the MetadataResponse is determined to be outdated.
Note that producer is interested in all partitions. Consumers can potentially be interested in only partitions that it has explicitly subscribed to. The purpose of checking only a subset of partitions is an optimization which aim to avoid unnecessary metadata refresh when the metadata is only outdated for partitions not needed by client. In other words, it is for optimization and it is not needed for correctness.
42) Offset commit
When consumer commits offset, it looks up leader_epoch of the and partition_epoch in the cached metadata and includes this value in the OffsetCommitRequest. The leader_epoch will included and partition_epoch will then be written in the message appended to the offset topic.
When coordinator receives the OffsetCommitRequest, for each partition in the OffsetCommitRequest, it will additionally check whether the leader_epoch or partition_epoch in the request >= leader epoch in the last commit. If not, the error for that partition in the OffsetCommitResponse will be INVALID. If the partition_epoch from the OffsetCommitRequest is smaller than the last known value, the error for this partition in the OffsetCommitResponse is INVALID_PARTITION_EPOCH. Otherwise, if the partition_epoch is the same but the leader_epoch from the OffsetCommitRequest is smaller than the last known value, the error for this partition in the OffsetCommitResponse is INVALID_LEADER_EPOCH.
If the client/broker library is old, both the leader_epoch and the partition_epoch are assumed to be -1 and broker will preserve the existing behavior without doing this additional check.
5) Consumer rebalance or initialization using the offset and epoch from the Kafka offset topic3) Offset fetch
After consumer receives OffsetFetchResponse, it remembers the leader_epoch and the partition_epoch for each partition it needs to consume. Then the consumer needs to repeatedly refresh metadata until the leader_epoch in the cached metadata >= the leader_epoch in OffsetFetchResponse for all partitions it wants to consume. Note that these logic are all hidden from user and the leader_epoch will not be exposed to user via consumer's public API (e.g. OffsetAndMetadata).if a metadata is considered outdated, which is determined using the similar criteria as introduced in the above subsection "Client's metadata refresh". The only difference is that, instead of using the leader_epoch and partition_epoch from the cached metadata, the consumer uses the values from the OffsetFetchResponse.
For existing version of the offset topic, leader_epoch and partition_epoch will not be available in the value of the offset topic message. We will use leader_epoch = -1 to indicate the missing leader_epoch. In this case leader_epoch in any MetadataResponse will be larger than the leader_epoch = -1 and the consumer behavior will be the same as it is nowBoth the leader_epoch and the partition_epoch are assumed to be -1 and the client will preserve the existing behavior without doing the additional metadata refresh.
6) Consumer initialization if offset is stored externally.
When committing offset, user should use the newly added API positionWithEpochs(...)
to read the leader_epoch and partition_epoch as well and store them together with the offset.
When initializing consumer, user should read the externally stored leader_epoch and partition_epoch together with the offset. Then user should call the newly added API seek(partition, offset, leaderEpoch, partitionEpoch)
. Later when consumer.poll() is called, the consumer needs to repeatedly refresh metadata if a metadata is considered outdated, which is determined using the similar criteria as introduced in the above subsection "Client's metadata refresh". The only difference is that, instead of using the leader_epoch and partition_epoch from the cached metadata, the consumer uses the values that were externally stored.
If user calls existing API seek(partition, offset) and there is not committed offset/leader_epoch/partition_epoch from the Kafka offset topic, both the leader_epoch and the partition_epoch are assumed to be -1 and the client will preserve the existing behavior without doing the additional check.
Compatibility, Deprecation, and Migration Plan
...
This can be a bit more complicated by introducing a new state in Kafka. leader_epoch is an existing state we already maintain in zookeeper. By using per-partition leader_epoch the client will only be forced to re-fresh metadata if the MetadataResponse contains out-dated metadata for those partitions that the client is interested in.
Future work
1) We can use the partition_epoch to detect that the partition used in the seek() has been deleted. Then we can thrown proper exception to the user so that user can handle it properly.
2) We can use the leader_epoch to better handle the offset reset in case of unclean leader election.