Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Step 1: StartpointManager#writeStartpoint(StartpointKey, Startpoint) stores Startpoint in the storage layer. 

Step 2: ExternalSource triggers SamzaProcessor to restart

Step 3: SamzaProcessor starts up and each task instance reads Checkpoints and also reads Startpoints via StartpointManager#readStartpoint(StartpointKey, removeUponRead)
Startpoints are removed from storage layerStorageLayer at the first checkpoint offset commit. If SamzaProcessor has checkpoints disabled (*.samza.reset.offset=true), then Startpoints are removed immediately after being read.

Step 4: SamzaProcessor task instances iterate through their SSPs and
   i) retrieves Startpoint from StartpointManager#readStartpoint(StartpointKey) call SystemConsumer#register(SystemStreamPartition, Startpoint). If a Startpoint exists for the key with TaskName+SSP, that takes higher precedence than key with only SSP.
  ii) else, call the existing SystemConsumer#register(SystemStreamPartition, checkpointOffset:String)

...