Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Step 1: StartpointManager#writeStartpoint(SSP, StartpointEntry) stores Startpoint in the storage layer. 

Step 2: ExternalSource triggers SamzaProcessor to restart

Step 3: SamzaProcessor starts up and each task instance reads Checkpoints and also reads Startpoints via StartpointManager#readStartpoint(SSP, removeUponRead)
As each of the SamzaProcessor task instances iterate through their SSPs:
    a) if there is a Startpoint for the requested SSP , call SystemConsumer#register(SystemStreamPartition, Startpoint). 
    b) else, call the existing SystemConsumer#register(SystemStreamPartition, checkpointOffset:String). Eventually, both interface methods will be unified since the interface implementations should be agnostic of where the starting offsets come from and should only be concerned with setting the starting offset, whether it comes from a Checkpoint or a Startpoint. This will be explored during implementation.

Startpoints are removed from StorageLayer at the first checkpoint offset commit. This ensures precedence is with the Startpoint until the next Checkpoint is committed, specifically in situations where the SamzaProcessor restarts before the commit. 
If 
If SamzaProcessor has  have checkpoints disabled (*.samza.reset.offset=true), then Startpoints are removed immediately after being read.

Startpoint Model


Code Block
languagejava
titleStartpoint
Startpoint
/**
 * Startpoint
 */
public class StartpointEntry {
  private final String taskName;
  private final Startpoint startpoint;

// Full implementation not shown for brevity
}

/**
 * Startpoint represents a position in a stream by {@link PositionType}
 */
public class Startpoint {
  private final PositionType PositionType;
  private final String position;

// Full implementation not shown for brevity
}

public enum PositionType {
  SPECIFIC_OFFSET,
  TIMESTAMP,
  EARLIEST,
  LATEST,
  BOOTSTRAP
}

...