runLoop()

ConsumerRebalanceListener

StreamPartitionAssinger

Create StreamThread

+ Consumer(s)

+ Producer

Close StreamThread

if(!running)

add all records to Task RecordQueues

records fetched

process records task by task
(processing order based on timestamp within each task)

-> advance "stream time"

maybePunctuate()

maybeCommit()

onPartitionsRevoke()

 

suspend all tasks

 - flush and commit

 - closeTopology()

rebalance

subcription()

 

add metadata of currently assigned tasks

Brokers

assign() (Consumer Group Leader)

 

compute new partition/task assignment (using PartitionGrouper)

onAssign()

 

store assignment  

 

store rebalance metadata in __consumer_offsets

<groupId, (subscriptionMetadata,assignMetadata)>

onPartitionsAssing() 

- close() revoked tasks

- resume reassigned tasks -> initTopolog()

- create newly assigned tasks -> initTopology() 

  -> maybe State recovery for new tasks

rebalance finshed
return to poll() and fetch records

while(running)

    Consumer#poll()