new StreamTask(Producer)
- initStateStores(): get state locks (maybe state recovery)
- initTopology()
task.initTopology()
- if EoS: initTransactions()
beginTransaction()
- for each node: node.init()
task.closeStateManger()
(release state locks)
task.flushState()
- StateStoreManager.flush()
- RecordCollector.flush() -> Producer.flush()
task.close()
- closeTopology()
ConsumerRebalanceListener.onPartitionsAssign()
- revoked all non-assigned suspended tasks
- close task
- close state manager
- resume reassigned tasks
- create newly assigned tasks
if EoS: create new Producer per task
else: share single Producer over all tasks
ConsumerRebalanceListener.onPartitionsRevoke()
suspend all tasks
- closeTopology
- flush
- commit
task.closeTopology()
- for each node: node.close()
Create StreamThread
while(running)
poll()
// close() or on error
shutdown()
for each task
task.closeProducer()
(EoS only)
task.commitOffsets()
- if EoS: Producer.sendOffsetsToTransaction()
Producer.commit()
[Producer.beginTransaction(): only if called from loop]
else Consumer.commit()
for each task
for each task
for each task
for each task
for each task
StreamThred.shutdown()
- close all tasks
- flush all state stores
- close all state managers
- if (no error) commit all offsets
- close clients
if EoS: for each task close producer
else: close global thread producer
if (!running)
or
on error
for each task
for each task
for each task
for each task
for each task