runLoop()

ConsumerRebalanceListener

StreamPartitionAssinger

Create StreamThread

+ Consumer(s)

+ Producer

Close StreamThread

if(!running)

add all records to Task RecordQueues

records fetched

process records task by task
(processing order based on timestamp within each task)

-> advance "stream time"

maybePunctuate()

maybeCommit()

onPartitionsRevoke()

 

suspend all tasks

 - closeTopology()

 -> keep state locks
 - flush and commit

 

 

rebalance

subcription()

 

add metadata of currently assigned tasks

Brokers

assign() (Consumer Group Leader)

 

compute new partition/task assignment (using PartitionGrouper)

onAssign()

 

store assignment  

 

store rebalance metadata in __consumer_offsets

<groupId, (subscriptionMetadata,assignMetadata)>

onPartitionsAssign()

 

- close() revoked tasks (release state locks)

- resume reassigned tasks -> initTopolog()

- create newly assigned tasks -> initTopology() 

  -> maybe state recovery for new tasks (get state locks)

rebalance finshed
return to poll() and fetch records

while(running)

    Consumer#poll()