Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Stage nameGoalDependency
Individual commitCreate a generic offset commit model beyond current partition → offset mapping.No
Key filtering based fetch on topic levelAdd capability to FetchRequest with specific hashed key range or specific keysNo
Rebalance support for concurrent assignment Add assignor support to allow splitting single topic partition with different hash rangeKey based filtering
Transactional supportIncorporate individual commit into the transaction processing modelKey based filtering
Support cooperative fetch on broker levelRound robin assign data to cooperative consumer fetches when there is no key-range specified Individual commitRebalance support

High Level Design

The design will be broken down, aligning with the roadmap defined above.

...

Code Block
OffsetFetchResponse => ThrottleTimeMs Topics ErrorCode
  ThrottleTimeMs => int32
  Topics         => List<OffsetFetchResponseTopic>
  ErrorCode      => int16

OffsetFetchResponseTopic => Name Partitions
  Name           => int32
  Partitions     => List<OffsetFetchResponsePartition>
  
OffsetFetchResponsePartition => PartitionIndex CommittedOffset CommittedLeaderEpoch Metadata ErrorCode
  PartitionIndex => int32
  CommittedOffset => int64
  CommittedLeaderEpoch => int32
  Metadata => string
  ErrorCode => int16
  CommittedRangeOffsets => List<Tuple<int64, int64>> // NEW


To leverage the key based filtering in a standalone mode, we also define a new consumer API in standalone mode:

...

Code Block
languagejava
titleConsumer.java
public Consumer {
  ...

 /**
  * Manually assign a list of partitions with specific key ranges to this consumer. If a partition maps to an empty list,
  * that means a full ownership of the partition.
  */
  void assign(Map<TopicPartition, List<Tuple<int64, int64>>> partitionWithKeyRanges);
}

For better visibility, we would also include

Code Block
languagejava
titleConsumer.java
public Consumer {
  ...

 /**
  * Manually assign a list of partitions with specific key ranges to this consumer. If a partition maps to an empty list,
  * that means a full ownership of the partition.
  */
  void assign(Map<TopicPartition, List<Tuple<int64, int64>>> partitionWithKeyRanges);
}

...