Create StreamThread

+ Consumer(s)

+ Producer

runLoop()

while (running)
  Consumer#poll()

Close StreamThread

if(!running)

add all records to Task RecordQueues

process records task by task
(processing order based on timestamp within each task)

This advances "stream time".

maybePunctuate()

maybeCommit()

onPartitionsRevoke()

 

suspend all tasks

 - flush and commit

 - closeTopology()

rebalance

subcription()

 

add metadata of currently assigned tasks

Brokers

assign() (Leader only)

 

compute new partition/task assignment (using PartitionGrouper)

onAssign()

 

store assignment  

 

store rebalance metadata

<groupId, (subscriptionMetadata,assignMetadata)>

onPartitionsAssing() 

- close() revoked tasks

- resume reassigned tasks -> initTopolog()

- create newly assigned tasks -> initTopology() 

  -> maybe State recovery for new tasks