Create StreamThread
+ Consumer(s)
+ Producer
runLoop()
while (running)
Consumer#poll()
Close StreamThread
if(!running)
add all records to Task RecordQueues
process records task by task
(processing order based on timestamp within each task)
This advances "stream time".
maybePunctuate()
maybeCommit()
onPartitionsRevoke()
suspend all tasks
- flush and commit
- closeTopology()
rebalance
subcription()
add metadata of currently assigned tasks
Brokers
assign() (Leader only)
compute new partition/task assignment (using PartitionGrouper)
onAssign()
store assignment
store rebalance metadata
<groupId, (subscriptionMetadata,assignMetadata)>
onPartitionsAssing()
- close() revoked tasks
- resume reassigned tasks -> initTopolog()
- create newly assigned tasks -> initTopology()
-> maybe State recovery for new tasks