...
Public Interfaces
Zookeeper
1) Add znode /topicpartition_epoch with the following json format
Code Block |
---|
{ "version" : int32, "topicpartition_epoch" : int32 } |
2) Update the znodes /brokers/topics/[topic]/partitions to use the following json format
Code Block |
---|
{ "version" : int32, "topic_epoch" : int32, // This is newly added "partitions" : { partitionint32 -> [int32] // partition has type int32. This maps each partition to list of replicasits partition_epoch .. } } |
Request/Response protocol
1) Add topicpartition_epoch to UpdateMetadataRequest
Code Block |
---|
UpdateMetadataRequest => controller_id controller_epoch max_partition_epoch partition_states live_brokers controller_id => int32 controller_epoch => int32 max_partition_statesepoch => [int32 <-- NEW partition_states => [UpdateMetadataRequestPartitionState] live_brokers => [UpdateMetadataRequestBroker] UpdateMetadataRequestPartitionState => topic partition controller_epoch leader leader_epoch topicpartition_epoch isr zk_version replicas offline_replicas topic => string partition => int32 controller_epoch => int32 leader => int32 leader_epoch => int32 topicpartition_epoch => int32 <-- NEW isr => [int32] zk_version => int32 replicas => [int32] offline_replicas => [int32] |
2) Add topicpartition_epoch and leaderepoch and leader_epoch to MetadataResponse
Code Block |
---|
MetadataResponse => throttle_time_ms max_partition_epoch brokers cluster_id controller_id topic_metadata throttle_time_ms => int32 max_partition_epoch => int32 <-- NEW brokers => [MetadatBroker] cluster_id => nullable_str controller_id => int32 topic_metadata => [TopicMetadata] TopicMetadata => topic_error_code topic is_internal topic_epoch topic_epoch partition_metadatapartition_metadata topic_error_code => int16 topic => str is_internal => boolean topic_epoch => int32 <-- NEW partition_metadata => [PartitionMetadata] PartitionMetadata => partition_error_code partition_id leader replicas leader_epoch partition_epoch isr offline_replicas partition_error_code => int16 partition_id => int32 leader => int32 replicas => [int32] leader_epoch => int32 <-- NEW isrpartition_epoch => [int32] int32 offline_replicas => [int32] <-- NEW isr => [int32] offline_replicas => [int32] |
3) Add topicpartition_epoch and leaderepoch and leader_epoch to OffsetCommitRequest
Code Block |
---|
OffsetCommitRequest => group_id generation_id memeber_id retention_time topics group_id => str generation_id => int32 member_id => str retention_time => int64 topics => [OffsetCommitRequestTopic] OffsetCommitRequestTopic => topic topic_epoch partitions topic => str topic_epochpartitions => int32 <-- NEW partitions => [OffsetCommitRequestPartition] OffsetCommitRequestPartition => partition offset leader_epoch metadata partition => int32 offset => int64 leader[OffsetCommitRequestPartition] OffsetCommitRequestPartition => partition offset leader_epoch partition_epoch metadata partition => int32 offset => int64 leader_epoch => int32 <-- NEW partition_epoch => int32 <-- NEW metadata => nullable_str |
4) Add topicpartition_epoch and leaderepoch and leader_epoch to OffsetFetchResponse
Code Block |
---|
OffsetFetchResponse => throttle_time_ms response error_code throttle_time_ms => int32 responses => [OffsetFetchResponseTopic] error_code => int16 OffsetFetchResponseTopic => topic topic_epoch partition_responses topic => str topic_epoch => int32 <-- NEW partition_responses => [OffsetFetchResponsePartition] OffsetFetchResponsePartition => partition offset leader_epoch partition_epoch metadata error_code partition => int32 offset => int64 leader_epoch => int32 <-- NEW partition_epoch => int32 <-- NEW metadata => nullable_str error_code => int16 |
Offset topic schema
Add topic_epoch and leader5) Add partition_epoch to the schema of the offset topic value.LeaderAndIsrRequest
Code Block |
---|
OFFSET_COMMIT_VALUE_SCHEMALeaderAndIsrRequest => offset topiccontroller_epochid leadercontroller_epoch metadata commitpartition_timestampstates expirelive_timestamp leaders offsetcontroller_id => int64int32 topiccontroller_epoch => int32 partition_states <-- NEW=> [LeaderAndIsrRequestPartitionState] leaderlive_epochleaders => int32 <-- NEW metadata[LeaderAndIsrRequestLiveLeader] LeaderAndIsrRequestPartitionState => topic partition controller_epoch leader leader_epoch partition_epoch isr zk_version replicas topic => str commit_timestamppartition => int64int32 expirecontroller_timestampepoch => int64 |
Consumer API
1) Add the following methods to the interface org.apache.kafka.clients.consumer.Consumer
Code Block |
---|
void seek(TopicPartition partition, long offset, OffsetEpoch offsetEpoch);
OffsetAndMetadata positionWithOffsetEpoch(TopicPartition partition); |
2) Add field offsetEpoch to the class org.apache.kafka.clients.consumer.OffsetAndMetadata and create new class OffsetEpoch
Code Block |
---|
public class OffsetAndMetadata {
// This field encodes leader_epoch and partition_epoch. User should not need to interpret this value.
private final OffsetEpoch offsetEpoch;
.... // Other existing variables and methods
}
// This class will encode leader_epoch and topic_epoch. It can be evolved to encode more information in the future.
public class OffsetEpoch {
static OffsetEpoch decode(byte[]);
public byte[] encode();
public String toString();
private int topicEpoch();
private int leaderEpoch();
private OffsetEpoch(int topicEpoch, int leaderEpoch);
} |
We can evolve the format of this field in the future as needed. This should only be used by Kafka client implementation and user should not need to interpret this string.
Proposed Changes
1) Topic creation and partition expansion.
When broker creates topic, broker will increment the global topic_epoch in the znode /topic_epoch
by 1 and write this partition epoch in the znode /brokers/topics/[topic]
according to its new json format.
2) Topic deletion.
When a topic is deleted, the global topic_epoch in the znode /topic_epoch
will be incremented by 1.
3) Metadata propagation from controller to brokers and from brokers to clients.
Controller should include the topic_epoch and leader_epoch for each partition in the UpdateMetadataRequest. Broker should also include topic_epoch and leader_epoch for each partition in the MetadataResponse to clients.
...
After client receives MetadataResponse from a broker, it compares with the MetadataResponse with the cached metadata to check whether the MetadataResponse is outdated. The MetadataResponse is outdated if, for all those partitions that the client is interested in, there exists a partition such that the following is true:
Code Block |
---|
metadataResponse[partition].topic_epoch < cachedMetadata[partition].topic_epoch ||
(metadataResponse[partition].topic_epoch == cachedMetadata[partition].topic_epoch &&
metadataResponse[partition].leader_epoch < cachedMetadata[partition].leader_epoch) |
The client will be forced to refresh metadata again with the existing backoff mechanism if the MetadataResponse is determined to be outdated.
Note that producer is interested in all partitions. Consumers can potentially be interested in only partitions that it has explicitly subscribed to. The purpose of checking only a subset of partitions is to avoid unnecessary metadata refresh when the metadata is only outdated for partitions not needed by client. In other words, it is for optimization and it is not needed for correctness.
...
When consumer commits offset, it includes leader_epoch and topic_epoch together with offset in the OffsetCommitRequest. The leader_epoch should be the largest leader_epoch of those already consumed messages whose offset < the commit offset. If no message has been consumed since consumer initialization, the leader_epoch from seek(...) or OffsetFetchResponse should be used. The topic_epoch should be the topic_epoch of the given topic in the latest metadata. The coordinator should extract these values from the OffsetCommitRequest and write them into the offset topic.
6) Consumer rebalance or initialization using the offset and epoch from the Kafka offset topic
After consumer receives OffsetFetchResponse, it remembers the leader_epoch and the topic_epoch for each partition it needs to consume. Then the consumer needs to repeatedly refresh metadata if a metadata is considered outdated, which is determined using the similar criteria as introduced in the above subsection "Client's metadata refresh". The only difference is that, instead of using the leader_epoch and topic_epoch from the cached metadata, the consumer uses the values from the OffsetFetchResponse.
For existing version of the offset topic, leader_epoch and topic_epoch will not be available in the value of the offset topic message. Both the leader_epoch and the topic_epoch are assumed to be -1 and the client will preserve the existing behavior without doing the additional metadata refresh.
...
int32
leader => int32
leader_epoch => int32
partition_epoch => int32 <-- NEW
isr => [int32]
zk_version => int32
replicas => [int32]
is_new_replica => boolean |
6) Add partition_epoch to FetchResponse
Code Block |
---|
FetchResponse => throttle_time_ms responses
throttle_time_ms => int32
responses => [FetchResponseTopic]
FetchResponseTopic => topic partition_responses
topic => str
partition_responses => [FetchResponsePartition]
FetchResponsePartition => partition_header record_set
partition_header => FetchResponsePartitionHeader
record_set => Records
FetchResponsePartitionHeader => partition partition_epoch error_code high_watermark last_stable_offset log_start_offset aborted_transactions
partition => int32
partition_epoch => int32 <-- NEW
error_code => int16
high_watermark => int64
last_stable_offset => int64
log_start_offset => in64
aborted_transactions => [FetchResponseAbortedTransaction] |
7) Add leader_epoch and partition_epoch to ListOffsetResponse
Code Block |
---|
ListOffsetResponse => throttle_time_ms responses
throttle_time_ms => int32
responses => [ListOffsetResponseTopic]
ListOffsetResponseTopic => topic partition_responses
topic => str
partition_responses => [ListOffsetResponsePartition]
ListOffsetResponsePartition => partition leader_epoch partition_epoch error_code timestamp offset
partition => int32
leader_epoch => int32 <-- NEW
partition_epoch => int32 <-- NEW
error_code => int16
timestamp => int64
offset => int64 |
Offset topic schema
Add partition_epoch and leader_epoch to the schema of the offset topic value.
Code Block |
---|
OFFSET_COMMIT_VALUE_SCHEMA => offset leader_epoch partition_epoch metadata commit_timestamp expire_timestamp
offset => int64
leader_epoch => int32 <-- NEW
partition_epoch => int32 <-- NEW
metadata => str
commit_timestamp => int64
expire_timestamp => int64 |
Consumer API
1) Create new class OffsetEpoch
We can evolve the OffsetEpoch class as needed to encode more information. This should only be used by Kafka client implementation and user should not need to interpret its value.
Code Block |
---|
public class OffsetAndMetadata {
// This field encodes leader_epoch and partition_epoch. User should not need to interpret this value.
private final OffsetEpoch offsetEpoch;
...
} |
2) Add the following methods to the interface org.apache.kafka.clients.consumer.Consumer
Code Block |
---|
void seek(TopicPartition partition, long offset, OffsetEpoch offsetEpoch);
OffsetAndMetadata positionAndOffsetEpoch(TopicPartition partition); |
3) Add field offsetEpoch to the class org.apache.kafka.clients.consumer.OffsetAndMetadata and the class org.apache.kafka.clients.consumer.OffsetAndTimestamp
Code Block |
---|
public class OffsetAndMetadata {
// This field encodes leader_epoch and partition_epoch. User should not need to interpret this value.
private final OffsetEpoch offsetEpoch;
...
}
public class OffsetAndTimestamp {
// This field encodes leader_epoch and partition_epoch. User should not need to interpret this value.
private final OffsetEpoch offsetEpoch;
...
} |
4) Add new Error INVALID_PARTITION_EPOCH
consumer.poll() will throw InvalidPartitionEpochException if partition_epoch of the given partition is different from the corresponding partition_epoch from the FetchResponse. This can happen if user consumes data with previously used offset after the topic is deleted.
Code Block |
---|
public class InvalidPartitionEpochException {
InvalidPartitionEpochException(Map<TopicPartition, Long> invalidPartitionEpochs)
} |
Proposed Changes
1) Topic creation and partition expansion.
When broker creates topic or expand partition, broker should increment the global partition_epoch in the znode /partition_epoch
by 1 for each new partition to be created. Thus each partition can be identified by a globally unique partition_epoch. The resulting partition -> partition_epoch mapping for this topic should be written in the znode /brokers/topics/[topic]/partitions
.
2) Topic deletion.
When a topic is deleted, the global partition_epoch in the znode /partition_epoch
will be incremented by 1. This can help us recognize the more recent metadata after the topic deletion.
3) Metadata propagation from controller to brokers and from brokers to clients.
Controller should include the current maximum partition_epoch in the UpdateMetadataRequest. Controller should also include partition_epoch and leader_epoch for each partition in the UpdateMetadataRequest. Broker should similarly include max_partition_epoch, partition_epoch and leader_epoch in the MetadataResponse to clients.
4) Client's metadata refresh
After client receives MetadataResponse from a broker, it compares with the MetadataResponse with the cached metadata to check whether the MetadataResponse is outdated. The MetadataResponse is outdated if, for all those partitions that the client is interested in, there exists a partition such that the following is true:
Code Block |
---|
metadata.max_partition_epoch < cachedMetadata.max_partition_epoch ||
metadataResponse[partition].partition_epoch < cachedMetadata[partition].partition_epoch ||
(metadataResponse[partition].partition_epoch == cachedMetadata[partition].partition_epoch &&
metadataResponse[partition].leader_epoch < cachedMetadata[partition].leader_epoch) |
The client should refresh metadata again with the existing backoff mechanism if the MetadataResponse is determined to be outdated.
Note that producer is interested in all partitions. Consumers can potentially be interested in only partitions that it has explicitly subscribed to. The purpose of checking only a subset of partitions is to avoid unnecessary metadata refresh when the metadata is only outdated for partitions not needed by client. In other words, it is for optimization and it is not needed for correctness.
5) Offset commit
When consumer commits offset, it includes leader_epoch and partition_epoch together with offset in the OffsetCommitRequest. The leader_epoch should be the largest leader_epoch of messages whose offset < the commit offset. If no message has been consumed since consumer initialization, the leader_epoch from seek(...) or OffsetFetchResponse should be used. The partition_epoch should be read from the last FetchResponse corresponding to the given partition and commit offset. The coordinator should extract these values from the OffsetCommitRequest and write them into the offset topic.
6) Consumer rebalance or initialization using the offset and epoch from the Kafka offset topic
After consumer receives OffsetFetchResponse, it remembers the leader_epoch and the partition_epoch for each partition it needs to consume. Then the consumer needs to repeatedly refresh metadata if a metadata is considered outdated, which is determined using the similar criteria as introduced in the above subsection "Client's metadata refresh". The only difference is that, instead of using the leader_epoch and partition_epoch from the cached metadata, the consumer uses the values from the OffsetFetchResponse.
For existing version of the offset topic, leader_epoch and partition_epoch will not be available in the value of the offset topic message. Both the leader_epoch and the partition_epoch are assumed to be -1 and the client will preserve the existing behavior without doing the additional metadata refresh.
7) Consumer initialization if offset is stored externally.
When committing offset, user should use the newly added API positionAndOffsetEpoch(...)
to read the offset and offsetEpoch (which encodes leader_epoch and partition_epoch). The leader_epoch should be the largest leader_epoch of those messages whose offset < position. If no message has been consumed since consumer initialization, the leader_epoch from seek(...) or OffsetFetchResponse should be used. The partition_epoch should be read from the last FetchResponse corresponding to the given partition and position offset. Both offset and the offsetEpoch (in the form of byte array) should be written to the external store.
When initializing consumer, user should read the externally stored offsetEpoch (as byte array) together with the offset. Then user should call the newly added API seek(partition, offset, offsetEpoch)
to seek to the previous offset. Later when consumer.poll() is called, the consumer needs to repeatedly refresh metadata if a metadata is considered outdated, which is determined using the similar criteria as introduced in the above subsection "Client's metadata refresh". The only difference is that, instead of using the leader_epoch and partition_epoch from the cached metadata, the consumer uses the values that extracted from the offsetEpoch provided to the seek(...).
If the offsetEpoch from seek(...) can not be decoded by Kafka client implementation, the IllegalArgumentException
is thrown from seek(...).
If user calls the existing API seek(partition, offset) and there is not committed offset/leader_epoch/partition_epoch from the Kafka offset topic, both the leader_epoch and the partition_epoch are assumed to be -1 and the client will preserve the existing behavior without doing the additional check.
8) Consumption after topic deletion
consumer.poll() will throw this exception if partition_epoch of the given partition is different from the corresponding partition_epoch from the FetchResponse. This can happen if user consumes data with previously used offset after the topic is deleted.
After consumer receives FetchResponse from a broker, consumer should verify that the partition_epoch from the FetchResponse equals the partition_epoch associated with the last used offset. The partition_epoch associated with the last used offset can be obtained from seek(...) and OffsetFetchResponse. If there is no partition_epoch associated with the last used offset, this check is not performed. If there is partition_epoch associated with the last used offset and its value is different from the partition_epoch from the FetchResponse, consumer.poll() should throw InvalidPartitionEpochException (constructed with partition -> partition_epoch mapping) those partitions
When committing offset, user should use the newly added API positionWithOffsetEpoch(...)
to read the internal_metadata (which encodes leader_epoch and topic_epoch). This internal metadata encodes the leader_epoch and topic_epoch. The leader_epoch should be the largest leader_epoch of those already consumed messages whose offset < position. If no message has been consumed since consumer initialization, the leader_epoch from seek(...) or OffsetFetchResponse should be used. The topic_epoch should be the topic_epoch of the given topic in the latest metadata.
When initializing consumer, user should read the externally stored internal_metadata together with the offset. Then user should call the newly added API seek(partition, offset, offsetEpoch)
to seek to the previous offset. Later when consumer.poll() is called, the consumer needs to repeatedly refresh metadata if a metadata is considered outdated, which is determined using the similar criteria as introduced in the above subsection "Client's metadata refresh". The only difference is that, instead of using the leader_epoch and topic_epoch from the cached metadata, the consumer uses the values that extracted from the offsetEpoch provided by the seek(...).
If the offsetEpoch from seek(...) can not be decoded by Kafka client implementation, the IllegalArgumentException
is thrown from seek(...).
If user calls existing API seek(partition, offset) and there is not committed offset/leader_epoch/topic_epoch from the Kafka offset topic, both the leader_epoch and the topic_epoch are assumed to be -1 and the client will preserve the existing behavior without doing the additional check.
Compatibility, Deprecation, and Migration Plan
...
Future work
1) We can use the topicpartition_epoch to detect that the partition used in the seek() has been deleted. Then we can thrown proper exception to the user so that user can handle it properly.
...