runLoop()
ConsumerRebalanceListener
StreamPartitionAssinger
Create StreamThread
+ Consumer(s)
+ Producer
Close StreamThread
if(!running)
add all records to Task RecordQueues
records fetched
process records task by task
(processing order based on timestamp within each task)
-> advance "stream time"
maybePunctuate()
maybeCommit()
onPartitionsRevoke()
suspend all tasks
- closeTopology()
-> keep state locks
- flush and commit
rebalance
subcription()
add metadata of currently assigned tasks
Brokers
assign() (Consumer Group Leader)
compute new partition/task assignment (using PartitionGrouper)
onAssign()
store assignment
store rebalance metadata in __consumer_offsets
<groupId, (subscriptionMetadata,assignMetadata)>
onPartitionsAssign()
- close() revoked tasks (release state locks)
- resume reassigned tasks -> initTopolog()
- create newly assigned tasks -> initTopology()
-> maybe state recovery for new tasks (get state locks)
rebalance finshed
return to poll() and fetch records
while(running)
Consumer#poll()