THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
{ "version" : int32, "topic_epoch" : int32, // This is newly added "partitions" : { partition -> [int32] // partition has type int32. This maps partition to list of replicas .. }, "topic_epoch" : int32 // This is newly added } |
Request/Response protocol
...
Code Block |
---|
UpdateMetadataRequest => controller_id controller_epoch partition_states live_brokers controller_id => int32 controller_epoch => int32 partition_states => [UpdateMetadataRequestPartitionState] live_brokers => [UpdateMetadataRequestBroker] UpdateMetadataRequestPartitionState => topic partition controller_epoch leader leader_epoch topic_epoch isr zk_version replicas offline_replicas topic => string partition => int32 controller_epoch => int32 leader => int32 leader_epoch => int32 topic_epoch => int32 <-- NEW isr => [int32] zk_version => int32 replicas => [int32] offline_replicas => [int32] |
2) Add leaderAdd topic_epoch and topicand leader_epoch to MetadataResponseepoch to MetadataResponse
Code Block |
---|
MetadataResponse => throttle_time_ms brokers cluster_id controller_id topic_metadata throttle_time_ms => int32 brokers => [MetadatBroker] cluster_id => nullable_str controller_id => int32 topic_metadata => [TopicMetadata] TopicMetadata => topic_error_code topic is_internal topic_epoch topic_epoch partition_metadata topic_error_code => int16 topic => str is_internal => boolean topic_epoch => int32 <-- NEW partition_metadata => [PartitionMetadata] PartitionMetadata => partition_error_code partition_id leader replicas leader_epoch topic_epoch isr offline_replicas partition_error_code => int16 partition_id => int32 leader => int32 replicas => [int32] leader_epoch => int32 <-- NEW isr => [int32] offline_replicas => [int32] |
3) Add leadertopic_epoch field and topic_epoch to and leader_epoch to OffsetCommitRequest
Code Block |
---|
OffsetCommitRequest => group_id generation_id memeber_id retention_time topics group_id => str generation_id => int32 member_id => str retention_time => int64 topics => [OffsetCommitRequestTopic] OffsetCommitRequestTopic => topic topic_epoch partitions topic => str topic_epoch => int32 <-- NEW partitions => [OffsetCommitRequestPartition] OffsetCommitRequestPartition => partition offset leader_epoch topic_epoch metadata partition => int32 offset => int64 leader_epoch => int32 <-- NEW metadata => nullable_str |
4) Add leadertopic_epoch and topicand leader_epoch to OffsetFetchResponse
Code Block |
---|
OffsetFetchResponse => throttle_time_ms response error_code throttle_time_ms => int32 responses => [OffsetFetchResponseTopic] error_code => int16 OffsetFetchResponseTopic => topic topic_epoch partition_responses topic => str topic_epoch => int32 <-- NEW partition_responses => [OffsetFetchResponsePartition] OffsetFetchResponsePartition => partition offset leader_epoch topic_epoch metadata error_code partition => int32 offset => int64 leader_epoch => int32 <-- NEW metadata => nullable_str error_code => int16 |
Offset topic schema
Add leadertopic_epoch and topicleader_epoch to the schema of the offset topic value.
Code Block |
---|
OFFSET_COMMIT_VALUE_SCHEMA => offset leadertopic_epoch topicleader_epoch metadata commit_timestamp expire_timestamp offset => int64 leadertopic_epoch => int32 <-- NEW topicleader_epoch => int32 <-- NEW metadata => str commit_timestamp => int64 expire_timestamp => int64 |
...