new StreamTask(Producer)

 - initStateStores(): get state locks (maybe state recovery)

 - initTopology()

task.initTopology()

 - if EoS: initTransactions()

                beginTransaction()

 - for each node: node.init()

task.closeStateManger()

(release state locks) 

task.flushState()

 - StateStoreManager.flush()

 - RecordCollector.flush() -> Producer.flush()

task.close()

 - closeTopology()

ConsumerRebalanceListener.onPartitionsAssign()

 

- revoked all non-assigned suspended tasks

  - close task

  - close state manager

- resume reassigned tasks

- create newly assigned tasks

  if EoS: create new Producer per task

  else: share single Producer over all tasks

ConsumerRebalanceListener.onPartitionsRevoke()

 

suspend all tasks

 - closeTopology

 - flush

 - commit

task.closeTopology()

 - for each node: node.close()

Create StreamThread

 

while(running)

  poll()

 

// close() or on error

shutdown()

for each task

task.closeProducer()

(EoS only) 

task.commitOffsets()

 - if EoS: Producer.sendOffsetsToTransaction()

               Producer.commit()

               [Producer.beginTransaction(): only if called from loop]

else Consumer.commit()

for each task

for each task

for each task

for each task

for each task

StreamThred.shutdown()

 

- close all tasks

- flush all state stores

- close all state managers

- if (no error) commit all offsets

- close clients

  if EoS: for each task close producer

  else: close global thread producer

if (!running)

or
on error

for each task

for each task

for each task

for each task

for each task