...
- Startpoint
- Checkpoint
- SystemStreamPartition(SSP)
- StartpointEntry(Startpoint + optional TaskName) - See Storing Requested Startpoint
Step 1: StartpointManager#writeStartpoint(SSP, StartpointEntry) stores Startpoint in the storage layer.
Step 2: ExternalSource triggers SamzaProcessor to restart
Step 3: SamzaProcessor starts up and each task instance reads Checkpoints and also reads Startpoints via StartpointManager#readStartpoint(SSP, removeUponRead).
As each of the SamzaProcessor task instances iterate through their SSPs:
a) if there is a Startpoint for the requested SSP , call SystemConsumer#register(SystemStreamPartition, Startpoint).
b) else, call the existing SystemConsumer#register(SystemStreamPartition, checkpointOffset:String). Eventually, both interface methods will be unified since the interface implementations should be agnostic of where the starting offsets come from and should only be concerned with setting the starting offset, whether it comes from a Checkpoint or a Startpoint. This will be explored during implementation.
Startpoints are removed from StorageLayer at the first checkpoint offset commit. This ensures precedence is with the Startpoint until the next Checkpoint is committed, specifically in situations where the SamzaProcessor restarts before the commit.
If If SamzaProcessor has have checkpoints disabled (*.samza.reset.offset=true), then Startpoints are removed immediately after being read.
Startpoint Model
Code Block | ||||
---|---|---|---|---|
| ||||
Startpoint /** * Startpoint */ public class StartpointEntry { private final String taskName; private final Startpoint startpoint; // Full implementation not shown for brevity } /** * Startpoint represents a position in a stream by {@link PositionType} */ public class Startpoint { private final PositionType PositionType; private final String position; // Full implementation not shown for brevity } public enum PositionType { SPECIFIC_OFFSET, TIMESTAMP, EARLIEST, LATEST, BOOTSTRAP } |
...