new StreamTask(Producer)

 - initStateStores()

 - if EoS: Producer.initTransactions()

 - initTopology()

task.resume()

 - if EoS: Producer.beginTransaction()

 - initTopology()

task.close()

 - suspend()

 - closeStateManager()

   - if (clean) write checkpoint file

     (release state locks) 

 - if EoS: Producer.close()

ConsumerRebalanceListener.onPartitionsAssign()

 

- revoked all non-assigned suspended tasks

  - close task

- resume reassigned tasks

- create newly assigned tasks

  if EoS: create new Producer per task

  else: share single Producer over all tasks

ConsumerRebalanceListener.onPartitionsRevoke()

 

suspend all tasks

 - suspend()

task.suspend()

 - closeTopology()

    - for each node: node.close()

 - commit()

Create StreamThread

 

while(running)

  poll()

 

// close() or on error

shutdown()

for each task

for each task

for each task

StreamThred.shutdown()

 

close all tasks

  - close()    

if (!running)

or
on error

for each task

task.commit()

 - StateStoreManager.flush()

 - RecordCollector.flush() -> Producer.flush()

 - if (!eos) write checkpoint file

 - commitOffsets:

   - if EoS: Producer.sendOffsetsToTransaction()

                 Producer.commit()

                 [Producer.beginTransaction(): only if called from loop]

     else Consumer.commit()

new StandbyTask()

 - initStateStores()

task.resume()

 - updateOffsetLimits()

initTopology()

 - for each node: node.init()

initStateStores()

 - updateOffsetLimits()

 - get state locks

 - maybe state recovery

task.commit()

 - StateStoreManager.flush()

 - if (!eos) write checkpoint file

 - updateOffsetsLimits()

task.close()

 - suspend()

 - closeStateManager()

   - if (clean) write checkpoint file

     (release state locks) 

task.suspend()

 - task.commit()