THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Stage name | Goal | Dependency |
---|---|---|
Individual commit | Create a generic offset commit model beyond current partition → offset mapping. | No |
Key filtering based fetch on topic level | Add capability to FetchRequest with specific hashed key range or specific keys | No |
Rebalance support for concurrent assignment | Add assignor support to allow splitting single topic partition with different hash range | Key based filtering |
Transactional support | Incorporate individual commit into the transaction processing model | Key based filtering |
Support cooperative fetch on broker level | Round robin assign data to cooperative consumer fetches when there is no key-range specified | Individual commitRebalance support |
High Level Design
The design will be broken down, aligning with the roadmap defined above.
...
Code Block |
---|
OffsetFetchResponse => ThrottleTimeMs Topics ErrorCode ThrottleTimeMs => int32 Topics => List<OffsetFetchResponseTopic> ErrorCode => int16 OffsetFetchResponseTopic => Name Partitions Name => int32 Partitions => List<OffsetFetchResponsePartition> OffsetFetchResponsePartition => PartitionIndex CommittedOffset CommittedLeaderEpoch Metadata ErrorCode PartitionIndex => int32 CommittedOffset => int64 CommittedLeaderEpoch => int32 Metadata => string ErrorCode => int16 CommittedRangeOffsets => List<Tuple<int64, int64>> // NEW |
To leverage the key based filtering in a standalone mode, we also define a new consumer API in standalone mode:
...
Code Block | ||||
---|---|---|---|---|
| ||||
public Consumer {
...
/**
* Manually assign a list of partitions with specific key ranges to this consumer. If a partition maps to an empty list,
* that means a full ownership of the partition.
*/
void assign(Map<TopicPartition, List<Tuple<int64, int64>>> partitionWithKeyRanges);
} |
For better visibility, we would also include
Code Block | ||||
---|---|---|---|---|
| ||||
public Consumer { ... /** * Manually assign a list of partitions with specific key ranges to this consumer. If a partition maps to an empty list, * that means a full ownership of the partition. */ void assign(Map<TopicPartition, List<Tuple<int64, int64>>> partitionWithKeyRanges); } |
...