Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • SamzaProcessor - For the sake of brevity, this represents SamzaContainer, TaskInstance, CheckpointManager and everything else in the framework that is not listed here.
  • SystemConsumer
  • StartpointManager
  • JobCoordinator
  • StorageLayer - See Storing Requested Startpoint
  • ExternalSource - i.e. Checkpoint tool, REST backend, etc..

...

Step 1: StartpointManager#writeStartpoint(SSP, StartpointEntryStartpoint) stores Startpoint in the storage layerStartpointManager#writeStartpointForTask(SSP, TaskName, Startpoint) can also be used for special cases to target only a specific task. 

Step 2: ExternalSource triggers SamzaProcessor to restart

Step 3:  SamzaProcessor starts up and each task instance reads Checkpoints and also reads Startpoints via StartpointManager#readStartpoint(SSP, removeUponRead)
As each of the SamzaProcessor task instances iterate through their SSPs:
    a) if there is a Startpoint for the requested SSP, call SystemConsumer#register(SystemStreamPartition, Startpoint). 
    b) else, call the existing SystemConsumer#register(SystemStreamPartition, checkpointOffset:String). Eventually, both interface methods will be unified since the interface implementations should be agnostic of where the starting offsets come from and should only be concerned with setting the starting offset, whether it comes from a Checkpoint or a Startpoint. This will be explored during implementation.

Startpoints are removed from StorageLayer at the first checkpoint offset commit. This ensures precedence is with the Startpoint until the next Checkpoint is committed, specifically in situations where the SamzaProcessor restarts before the commit. 
If SamzaProcessor have checkpoints disabled (*.samza.reset.offset=true), then Startpoints are removed immediately after being read.

Upon startup, the JobCoordinator calls StartpointManager#groupStartpointsPerTask(SSP, SystemStreamPartitionGrouper) to remap Startpoints keyed only by SSP to be re-keyed by SSP+TaskName. Startpoints already keyed by SSP+TaskName are not overwritten. This remapping is required because once a Startpoint is consumed, it will need to be deleted at the next Checkpoint commit (See Note below). Checkpoint commits happen at a per task level.

Step 3: SamzaProcessor starts up and each task instance reads Checkpoints and also reads Startpoints via StartpointManager#readStartpointForTask(SSP, TaskName)
As each of the SamzaProcessor task instances iterate through their SSPs:
    a) if there is a Startpoint for the requested SSP, call SystemConsumer#register(SystemStreamPartition, Startpoint). 
    b) else, call the existing SystemConsumer#register(SystemStreamPartition, checkpointOffset:String). Eventually, both interface methods will be unified since the interface implementations should be agnostic of where the starting offsets come from and should only be concerned with setting the starting offset, whether it comes from a Checkpoint or a Startpoint. This will be explored during implementation.

Note: Startpoints are removed from StorageLayer at the first checkpoint offset commit. This ensures precedence is with the Startpoint until the next Checkpoint is committed, specifically in situations where the SamzaProcessor restarts before the commit. 
If SamzaProcessor have checkpoints disabled (*.samza.reset.offset=true), then Startpoints are removed immediately after being read.

Startpoint ModelStartpoint Model


Code Block
languagejava
titleStartpoint
Startpoint
/**
 * Startpoint
 */
public class StartpointEntry {
  private final String taskName;
  private final Startpoint startpoint;

// Full implementation not shown for brevity
}

/**
 * Startpoint represents a position represents a position in a stream by {@link PositionType}
 */
public class Startpoint {
  private final Long storedAt;
  private final PositionType PositionType;
  private final String position;

//  Fullpublic implementationstatic notStartpoint shown for brevity
withOffset(String offset) {...}

  public enum PositionType {
  SPECIFIC_OFFSET,
  TIMESTAMP,
  EARLIEST,
  LATEST,
  BOOTSTRAP
}

SystemConsumer Registering Startpoint

The provided SystemConsumer implementations by the Samza framework will implement the new register interface below. This is analogous to the existing overloaded #register(SystemStreamPartition, String) method used to recover from checkpoint offsets. The plan is to eventually deprecate #register(SystemStreamPartition, String) and unify the values in the Startpoint metadata store and Checkpoint stream to use the same Startpoint format.

 static Startpoint withTimestamp(long timestamp) {...}

  public static Startpoint withEarliest() {...}

  public static Startpoint withLatest() {...}

  public static Startpoint withBootstrap(String bootstrapInfo) {...}
// Full implementation not shown for brevity
}

public enum PositionType {
  SPECIFIC_OFFSET,
  TIMESTAMP,
  EARLIEST,
  LATEST,
  BOOTSTRAP
}

SystemConsumer Registering Startpoint

The provided SystemConsumer implementations by the Samza framework will implement the new register interface below. This is analogous to the existing overloaded #register(SystemStreamPartition, String) method used to recover from checkpoint offsets. The plan is to eventually deprecate #register(SystemStreamPartition, String) and unify the values in the Startpoint metadata store and Checkpoint stream to use the same Startpoint format.


Code Block
languagejava
titleSystemConsumer
public interface SystemConsumer {
  /**
   * Register a SystemStreamPartition to this SystemConsumer by an externally defined
   * {@link Startpoint}, only if one exists for this SystemStreamPartition. If none exists,
   * {@link #register(SystemStreamPartition, String)} will be called instead.
   * If a SystemConsumer does not support a particular {@link PositionType}, an exception
   * should be thrown. Otherwise, the SystemConsumer is expected to seek to the offset position.
   *
   * @param systemStreamPartition
Code Block
languagejava
titleSystemConsumer
public interface SystemConsumer {
  /**
   * Register a SystemStreamPartition to this SystemConsumer by an externally defined
   * {@link Startpoint}, only if one exists for this SystemStreamPartition. If none exists,
   * {@link #register(SystemStreamPartition, String)} will be called instead.
   * If a SystemConsumer does not support a particular {@link PositionType}, an exception
   * should be thrown. Otherwise, the SystemConsumer is expected to seek to the offset position.
   *
   * @param systemStreamPartition
   *          The SystemStreamPartition object representing the Samza
   *          SystemStreamPartition to receive messages from.
   * @param startpoint
   *          {@link Startpoint)The SystemStreamPartition object representing position in the streamSamza
 to start
 *  *        SystemStreamPartition to readingreceive eventsmessages from.
   */ @param startpoint
  default void* register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
    if (startpoint.getPositionType() != PositionType.SPECIFIC_OFFSET) {
      throw new UnsupportedException("Not implemented");
 {@link Startpoint) representing position in the stream to start
   *    }
    register(systemStreamPartition, startpoint.getPosition());
  }
  reading events from.
  @Deprecated *//
 Unify todefault eventually use the register method above only.
  void void register(SystemStreamPartition systemStreamPartition, StringStartpoint offsetstartpoint) {
    if ...
  }
 
// Excluding existing interface methods for brevity
}
 
// Below is pseudocode for system specific implementations on how to handle TIMESTAMP position types
 
public KafkaSystemConsumer implements SystemConsumer {
  @Override
  public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
    if (startpoint.getPositionType() == PositionType.TIMESTAMP) {
      // Call underlying Kafka Consumer#offsetsForTimes().
      // Call underlying Kafka Consumer#seek() with offsets returned from offsetsForTimes()
    } else if...
  }
}
 
public EventHubSystemConsumer(startpoint.getPositionType() != PositionType.SPECIFIC_OFFSET) {
      throw new UnsupportedException("Not implemented");
    }
    register(systemStreamPartition, startpoint.getPosition());
  }
 
  @Deprecated // Unify to eventually use the register method above only.
  void register(SystemStreamPartition systemStreamPartition, String offset) {
    ...
  }
 
// Excluding existing interface methods for brevity
}
 
// Below is pseudocode for system specific implementations on how to handle TIMESTAMP position types
 
public KafkaSystemConsumer implements SystemConsumer {
  @Override
  public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
    if (startpoint.getPositionType() == PositionType.TIMESTAMP) {
      // eventPositionCall underlying =Kafka EventPosition.fromEnqueuedTime(toDateTime(startpoint.position()))
 Consumer#offsetsForTimes().
      // eventHubClientManager.getEventHubClient().createReceiverSync(consumerGroup, systemStramPartition.getPartitionId(), eventPosition Call underlying Kafka Consumer#seek() with offsets returned from offsetsForTimes()
    } else if...
  }
}

Storing Requested Startpoint

Out-of-band Metadata Store

The out-of-band metadata store used is described by the metadata store abstraction feature (SAMZA-1786) from SEP-11. The Startpoints are stored within its own namespace in the metadata store configured by `metadata.store.factory`.

Abstractly, we think of the metadata store as a KV store and Startpoints are stored as:

SSP→StartpointEntry(Startpoint, TaskName)

TaskName is optional and is primarily for broadcast inputs where the same SSP spans across multiple tasks.

StartpointManager

StartpointManager is the main API to read and write Startpoints and is composed alongside the CheckpointManager in the OffsetManager. The StartpointManager is system implementation agnostic and handles the serialization and deserialization of Startpoints into the metadata store.


 
public EventHubSystemConsumer implements SystemConsumer {
  @Override
  public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
    if (startpoint.getPositionType() == PositionType.TIMESTAMP) {
      // eventPosition = EventPosition.fromEnqueuedTime(toDateTime(startpoint.position()))
      // eventHubClientManager.getEventHubClient().createReceiverSync(consumerGroup, systemStramPartition.getPartitionId(), eventPosition)
    } else if...
  }
}

Storing Requested Startpoint

Out-of-band Metadata Store

The out-of-band metadata store used is described by the metadata store abstraction feature (SAMZA-1786) from SEP-11. The Startpoints are stored within its own namespace in the metadata store configured by `metadata.store.factory`.

Abstractly, we think of the metadata store as a KV store and Startpoints are stored as:

SSP→StartpointEntry(Startpoint, TaskName)

TaskName is optional and is primarily for broadcast inputs where the same SSP spans across multiple tasks.

StartpointManager

StartpointManager is the main API to read and write Startpoints and is composed alongside the CheckpointManager in the OffsetManager. The StartpointManager is system implementation agnostic and handles the serialization and deserialization of Startpoints into the metadata store.

Code Block
languagejava
titleStartpointManager
/**
 * The StartpointManager reads and writes {@link Startpoint} to the metadata store defined by
 * the configuration task.startpoint.metadata.store.factory.
 */
public class StartpointManager {
  /**
   * Writes a {@link Startpoint} that defines the start position for a {@link SystemStreamPartition}.
   *
   * @param ssp The {@link SystemStreamPartition} to map the {@link Startpoint} against.
   * @param startpoint Reference to a Startpoint object.
   */
  public void writeStartpoint(SystemStreamPartition ssp, Startpoint startpoint) {...}


  /**
   * Writes a {@link Startpoint} that defines the start position for a specified {@link SystemStreamPartition} for a specific {@link TaskName}.
   *
   * @param ssp The {@link SystemStreamPartition} to map the {@link Startpoint} against.
   * @param taskName The {@link TaskName} to map the {@link Startpoint} against.
   * @param startpoint Reference to a Startpoint object.
   */
  public void writeStartpointForTask(SystemStreamPartition ssp, TaskName taskName, Startpoint startpoint) {...}

  /**
   * Returns the last {@link Startpoint} that defines the start position for a {@link SystemStreamPartition}.
   *
   * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
   * @return {@link Startpoint} for the {@link SystemStreamPartition}, or null if it does not exist.
   */
  public Startpoint readStartpoint(SystemStreamPartition ssp) {...}

  /**
   * Returns the last {@link Startpoint} for a specified {@link SystemStreamPartition} for a specific {@link TaskName}.
   *
   * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
   * @return {@link Startpoint} for the {@link SystemStreamPartition}, or null if it does not exist.
Code Block
languagejava
titleStartpointManager
/**
 * The StartpointManager reads and writes {@link Startpoint} to the metadata store defined by
 * the configuration task.startpoint.metadata.store.factory.
 */
public class StartpointManager {
  /**
   * Construct the StartpointManager with the {@link MetadataStore} to use. 
   */
  public StartpointManager(MetadataStore metadataStore Startpoint readStartpointForTask(SystemStreamPartition ssp, TaskName taskName) { ... }
  
  /**
   * WritesDeletes athe {@link Startpoint} thatfor definesthe thespecified start position for stream partitions.{@link SystemStreamPartition}
   *
   * @param ssp The {@link SystemStreamPartition} to mapfetch the {@link Startpoint} againstfor.
   */
 @param startpointEntrypublic Referencevoid to a StartpointEntry object.
   */deleteStartpoint(SystemStreamPartition ssp) {...}

  public void writeStartpointdeleteStartpointForTask(SystemStreamPartition ssp, StartpointEntryTaskName startpointEntrytaskName) { ... }


  /**
   * Returns the lastFor {@link Startpoint}s forkeyed aonly specifiedby {@link SystemStreamPartition}., this method remaps the Startpoints for the specified
   * @paramSystemStreamPartition sspto The SystemStreamPartition+{@link SystemStreamPartition} to fetchTaskName} for all tasks provided by the {@link StartpointSystemStreamPartitionGrouper}
 for.  *
   * @param removeUponRead If true, removes the Startpoint once read.
   * @return {@link Startpoint} for the {@link SystemStreamPartition}, or null if it does not exist @implNote This method is not atomic or thread-safe. The intent is for the Samza Processor's coordinator to use this
   * method to assign the Startpoints to the appropriate tasks.
   */
 @param ssp publicThe StartpointEntry{@link readStartpoint(SystemStreamPartition} ssp, boolean removeUponRead)that a {@link ... }

  /**Startpoint} is keyed to.
   * @param Returnsgrouper allThe {@link Startpoint} in the {@link MetadataStore}SystemStreamPartitionGrouper} is used to determine what the task names are.
   * @return mapThe list of existing {@link SystemStreamPartition} to {@link Startpoint}.TaskName}s
   */
  public Map<SystemStreamPartition Set<TaskName> groupStartpointsPerTask(SystemStreamPartition ssp, StartpointEntry>SystemStreamPartitionGrouper readAllStartpoints(grouper) { ... }

// Full implementation not shown for brevity
}

...