This would be a two phase change:
New RequestKeys type "OffsetCommitRequest", key = 6 (or whatever the next one is).
OffsetCommitRequest => ConsumerGroup [TopicName [Partition Offset LastOffset]] TopicName => string Partition => int32 ConsumerGroup => string Offset => int64 LastOffset => int64 |
Optionally including the last offset read allows for some basic concurrency control. This would tell the offset committer to only update the offset if the current value is equal to LastOffset.
I would suggest we only have a single offset committer thread per broker as this will avoid complex concurrency issues. ZooKeeper should be able to keep up with the write throughput. The broker can keep a write-through cache of offsets for each group+topic+partition so there is no synchronization with ZooKeeper required. This would allow the broker to do the aforementioned offset check-and-set rather than a read+write to ZooKeeper for each commit.
For a start, it makes sense to keep the current ZooKeeper structure in place:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value |
However, since offset management is being centralized to the brokers, there is no need to keep track of offsets in ZooKeeper (right?). It could just as well be kept in a single file or lightweight database (BDB, or the like).