Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Update design based off of review from https://github.com/apache/samza/pull/860

...

  • SamzaProcessor - For the sake of brevity, this represents SamzaContainer, TaskInstance, CheckpointManager and everything else in the framework that is not listed here.
  • JobCoordinator
  • SystemConsumer (implements interface StartpointVisitor)
  • StartpointManager
  • StorageLayer - See Storing Requested Startpoint
  • ExternalSource - i.e. Checkpoint tool, REST backend, etc..

...

Step 1: StartpointManager#writeStartpoint(SSP, Startpoint) stores Startpoint in the storage layerStartpointManager#writeStartpointForTask#writeStartpoint(SSP, TaskName, Startpoint) can also be used for special cases to target only a specific task. 

Step 2: ExternalSource triggers SamzaProcessor to restart

Step 3: Upon startup, the JobCoordinator calls StartpointManager#groupStartpointsPerTask#fanOutStartpointsToTasks(SSP, SystemStreamPartitionGrouper) to remap Startpoints keyed only by SSP to be re-keyed by SSP+TaskName. Startpoints already keyed by SSP+TaskName are not overwritten. This remapping is required because once a Startpoint is consumed, it will need to be deleted at the next Checkpoint commit (See Note below). Checkpoint commits happen at a per task level.

Step 3: SamzaProcessor starts up and each task instance reads Checkpoints and also reads Startpoints via StartpointManager#readStartpointForTask(SSP, TaskName)
As each of the SamzaProcessor task instances iterate through their SSPs:
    a) if there is a Startpoint for the requested SSP, call SystemConsumer#registercall the StartpointVisitor#visit(SystemStreamPartition, Startpoint) interface implementated by a SystemConsumer. 
    b) else, call the existing SystemConsumer#register(SystemStreamPartition, checkpointOffset:String). Eventually, both interface methods will be unified since the interface implementations should be agnostic of where the starting offsets come from and should only be concerned with setting the starting offset, whether it comes from a Checkpoint or a Startpoint. This will be explored during implementation.

Note: Startpoints are removed from StorageLayer at the first checkpoint offset commit. This ensures precedence is with the Startpoint until the next Checkpoint is committed, specifically in situations where the SamzaProcessor restarts before the commit. 
If SamzaProcessor have checkpoints disabled (*.samza.reset.offset=true), then Startpoints are removed immediately after being read.

Startpoint

...

Models


Code Block
languagejava
titleStartpoint
/**
 * Startpoint represents a position in a stream by {@link PositionType}partition
 */
public abstract class Startpoint {
  private final Longlong storedAtcreationTimestamp;

  private final PositionType PositionType;/**
  private final* String position;

  public static Startpoint withOffset(String offset) {...}

  public static Startpoint withTimestamp(long timestamp) {...}

  public static Startpoint withEarliest() {...}

  public static Startpoint withLatest() {...}

  public static Startpoint withBootstrap(String bootstrapInfo) {...}Apply the visitor {@link StartpointVisitor}'s register methods to the instance of this {@link Startpoint}
   * class.
   * @param systemStreamPartition The {@link SystemStreamPartition} needed to register with the {@link StartpointVisitor}
   * @param startpointVisitor The visitor to register with.
   */
  public abstract void apply(SystemStreamPartition systemStreamPartition, StartpointVisitor startpointVisitor);
  
// Full implementation not shown for brevity
}


public enum PositionType {
  SPECIFIC_OFFSET,
  TIMESTAMP,
  EARLIEST,
  LATEST,
  BOOTSTRAP
}

SystemConsumer Registering Startpoint

The provided SystemConsumer implementations by the Samza framework will implement the new register interface below. This is analogous to the existing overloaded #register(SystemStreamPartition, String) method used to recover from checkpoint offsets. The plan is to eventually deprecate #register(SystemStreamPartition, String) and unify the values in the Startpoint metadata store and Checkpoint stream to use the same Startpoint format.

Code Block
languagejava
titleSystemConsumer
public interface SystemConsumer {
  /**
   * Register a SystemStreamPartition to this SystemConsumer by an externally defined
   * {@link Startpoint}, only if one exists for this SystemStreamPartition. If none exists,
   * {@link #register(SystemStreamPartition, String)} will be called instead.
   * If a SystemConsumer does not support a particular {@link PositionType}, an exception
   * should be thrown. Otherwise, the SystemConsumer is expected to seek to the offset position.
   *
   * @param systemStreamPartition
   *          The SystemStreamPartition object representing the Samza
   *          SystemStreamPartition to receive messages from.
   * @param startpoint
   *          {@link Startpoint) representing position in the stream to start
   *          reading events from.
   */
  default void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
    if (startpoint.getPositionType() != PositionType.SPECIFIC_OFFSET) {
      throw new UnsupportedException("Not implemented");
    }
    register(systemStreamPartition, startpoint.getPosition());
  }
 
  @Deprecated // Unify to eventually use the register method above only.
  void register(SystemStreamPartition systemStreamPartition, String offset) {
    ...
  }
 
// Excluding existing interface methods for brevity
}
 
// Below is pseudocode for system specific implementations on how to handle TIMESTAMP position types
 
public KafkaSystemConsumer implements SystemConsumer {
  @Override
  public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
    if (startpoint.getPositionType() == PositionType.TIMESTAMP) {
      // Call underlying Kafka Consumer#offsetsForTimes().
      // Call underlying Kafka Consumer#seek() with offsets returned from offsetsForTimes()
    } else if...
  }
}
 
public EventHubSystemConsumer implements SystemConsumer {
  @Override
  public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
    if (startpoint.getPositionType() == PositionType.TIMESTAMP) {
      // eventPosition = EventPosition.fromEnqueuedTime(toDateTime(startpoint.position()))
      // eventHubClientManager.getEventHubClient().createReceiverSync(consumerGroup, systemStramPartition.getPartitionId(), eventPosition)
    } else if...
  }
}

Storing Requested Startpoint

Out-of-band Metadata Store

The out-of-band metadata store used is described by the metadata store abstraction feature (SAMZA-1786) from SEP-11. The Startpoints are stored within its own namespace in the metadata store configured by `metadata.store.factory`.

Abstractly, we think of the metadata store as a KV store and Startpoints are stored as:

SSP→Startpoint or SSP+TaskName→Startpoint

TaskName is optional and is primarily for broadcast inputs where the same SSP spans across multiple tasks. However, when a job starts, all Startpoints keyed only by SSP will be remapped to SSP+TaskName. See General Workflow above for details.

StartpointManager

StartpointManager is the main API to read and write Startpoints and is composed alongside the CheckpointManager in the OffsetManager. The StartpointManager is system implementation agnostic and handles the serialization and deserialization of Startpoints into the metadata store.

/**
 * A {@link Startpoint} that represents a specific offset in a stream partition.
 */
public final class StartpointSpecific extends Startpoint { ... }


/**
 * A {@link Startpoint} that represents a timestamp offset in a stream partition.
 */
public final class StartpointTimestamp extends Startpoint { ... }


/**
 * A {@link Startpoint} that represents the earliest offset in a stream partition.
 */
public final class StartpointOldest extends Startpoint { ... }


/**
 * A {@link Startpoint} that represents the latest offset in a stream partition.
 */
public final class StartpointUpcoming extends Startpoint { ... }


/**
 * A {@link Startpoint} that represents a custom startpoint. This is for systems that have a non-generic option
 * for setting offsets. Startpoints are serialized to JSON in the {@link org.apache.samza.metadatastore.MetadataStore}
 * and it is recommended to maintain the subclass of this {@link StartpointCustom} as a simple POJO.
 */
public abstract class StartpointCustom extends Startpoint { ... }
 ...

StartpointVisitor



Code Block
languagejava
titleSystemConsumer
/**
 * Visitor interface for system consumers to implement to support {@link Startpoint}s.
 */
public interface StartpointVisitor {

  /**
   * Seek to specific offset represented by {@link StartpointSpecific}
   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
   * @param startpointSpecific The {@link Startpoint} that represents the specific offset.
   */
  void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific);

  /**
   * Seek to timestamp offset represented by {@link StartpointTimestamp}
   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
   * @param startpointTimestamp The {@link Startpoint} that represents the timestamp offset.
   */
  default void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) {
    throw new UnsupportedOperationException("StartpointTimestamp is not supported.");
  }

  /**
   * Seek to earliest offset represented by {@link StartpointOldest}
   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
   * @param startpointOldest The {@link Startpoint} that represents the earliest offset.
   */
  default void visit(SystemStreamPartition systemStreamPartition, StartpointOldest startpointOldest) {
    throw new UnsupportedOperationException("StartpointOldest is not supported.");
  }

  /**
   * Seek to latest offset represented by {@link StartpointUpcoming}
   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
   * @param startpointUpcoming The {@link Startpoint} that represents the latest offset.
   */
  default void visit(SystemStreamPartition systemStreamPartition, StartpointUpcoming startpointUpcoming) {
    throw new UnsupportedOperationException("StartpointUpcoming is not supported.");
  }

  /**
   * Bootstrap signal represented by {@link StartpointCustom}
   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
   * @param startpointCustom The {@link Startpoint} that represents the bootstrap signal.
   */
  default void visit(SystemStreamPartition systemStreamPartition, StartpointCustom startpointCustom) {
    throw new UnsupportedOperationException(String.format("%s is not supported.", startpointCustom.getClass().getSimpleName()));
  }
}
 
// Below is example pseudocode for system specific implementations on how to handle timestamp position types. Other position types,
// (StartpointSpecific, StartpointOldest, StartpointUpcoming) left out for brevity.
 
public KafkaSystemConsumer implements SystemConsumer, StartpointVisitor {
  @Override
  public void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpoint) {
    if (startpoint.getPositionType() == PositionType.TIMESTAMP) {
      // Call underlying Kafka Consumer#offsetsForTimes().
      // Call underlying Kafka Consumer#seek() with offsets returned from offsetsForTimes()
    } else if...
  }
}
 
public EventHubSystemConsumer implements SystemConsumer, StartpointVisitor {
  @Override
  public void visit(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
    if (startpoint.getPositionType() == PositionType.TIMESTAMP) {
      // eventPosition = EventPosition.fromEnqueuedTime(toDateTime(startpoint.position()))
      // eventHubClientManager.getEventHubClient().createReceiverSync(consumerGroup, systemStramPartition.getPartitionId(), eventPosition)
    } else if...
  }
}

Storing Requested Startpoint

Out-of-band Metadata Store

The out-of-band metadata store used is described by the metadata store abstraction feature (SAMZA-1786) from SEP-11. The Startpoints are stored within its own namespace in the metadata store configured by `metadata.store.factory`.

Abstractly, we think of the metadata store as a KV store and Startpoints are stored as:

SSP→Startpoint or SSP+TaskName→Startpoint

TaskName is optional and is primarily for broadcast inputs where the same SSP spans across multiple tasks. However, when a job starts, all Startpoints keyed only by SSP will be remapped to SSP+TaskName. See General Workflow above for details.

StartpointManager

StartpointManager is the main API to read and write Startpoints and is composed alongside the CheckpointManager in the OffsetManager. The StartpointManager is system implementation agnostic and handles the serialization and deserialization of Startpoints into the metadata store.

Code Block
languagejava
titleStartpointManager
/**
 * The StartpointManager reads and writes {@link Startpoint} to the metadata store defined by
 * the configuration task.startpoint.metadata.store.factory.
 */
public class StartpointManager {
  /**
   * Writes a {@link Startpoint} that defines the start position for a {@link SystemStreamPartition}.
   *
   * @param ssp The {@link SystemStreamPartition} to map the {@link Startpoint} against.
   * @param startpoint Reference to a Startpoint object.
   */
  public void writeStartpoint(SystemStreamPartition ssp, Startpoint startpoint) {...}


  /**
   * Writes a {@link Startpoint} that defines the start position for a specified {@link SystemStreamPartition} for a specific {@link TaskName}.
   *
   * @param ssp The {@link SystemStreamPartition} to map the {@link Startpoint} against.
   * @param taskName The {@link TaskName} to map the {@link Startpoint} against.
   * @param startpoint Reference to a Startpoint object.
   */
  public void writeStartpoint(SystemStreamPartition ssp, TaskName taskName, Startpoint startpoint) {...}
Code Block
languagejava
titleStartpointManager
/**
 * The StartpointManager reads and writes {@link Startpoint} to the metadata store defined by
 * the configuration task.startpoint.metadata.store.factory.
 */
public class StartpointManager {
  /**
   * Returns Writesthe alast {@link Startpoint} that defines the start position for a {@link SystemStreamPartition}.
   *
   * @param ssp The {@link SystemStreamPartition} to mapfetch the {@link Startpoint} againstfor.
   * @param startpoint Reference to a Startpoint object@return {@link Startpoint} for the {@link SystemStreamPartition}, or null if it does not exist.
   */
  public voidStartpoint writeStartpointreadStartpoint(SystemStreamPartition ssp, Startpoint startpoint) {...}


  /**
   * Returns Writesthe alast {@link Startpoint} that defines the start position for a specified {@link SystemStreamPartition} for a specific {@link TaskName}.
   *
   * @param ssp The {@link SystemStreamPartition} to mapfetch the {@link Startpoint} againstfor.
   * @param taskName The {@link TaskName} to map the {@link Startpoint} against.
   * @param startpoint Reference to a Startpoint object @return {@link Startpoint} for the {@link SystemStreamPartition}, or null if it does not exist.
   */
  public voidStartpoint writeStartpointForTaskreadStartpoint(SystemStreamPartition ssp, TaskName taskName, Startpoint startpoint) {...}

  /**
   * ReturnsDeletes the last {@link Startpoint} that definesfor the start position for aspecified {@link SystemStreamPartition}.
   *
   * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
   * @return {@link Startpoint} for the {@link SystemStreamPartition}, or null if it does not exist.
   */
  public Startpointvoid readStartpointdeleteStartpoint(SystemStreamPartition ssp) {...}

  /**
   * ReturnsDeletes the last {@link Startpoint} for athe specified {@link SystemStreamPartition} for a specific {@link TaskName}.
   *
   * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
   * @return@param taskName The {@link StartpointTaskName} forto map the {@link SystemStreamPartitionStartpoint}, or null if it does not exist against.
   */
  public Startpointvoid readStartpointForTaskdeleteStartpoint(SystemStreamPartition ssp, TaskName taskName) {...}

  /**
   * Deletes theFor {@link Startpoint}s forkeyed theonly specifiedby {@link SystemStreamPartition}
, this method *
re-maps the  *Startpoints @paramfrom
 ssp The {@link* SystemStreamPartition} to fetch the SystemStreamPartition+{@link StartpointTaskName} for.
 all tasks */
provided by publicthe void deleteStartpoint(SystemStreamPartition ssp) {...}

  public void deleteStartpointForTask(SystemStreamPartition ssp, TaskName taskName) {...}


  /**
   * For {@link Startpoint}s keyed only by {@link SystemStreamPartition}, this method remaps{@link JobModel}
   * This method is not atomic or thread-safe. The intent is for the Samza Processor's coordinator to use this
   * method to assign the Startpoints forto the appropriate specified
   * SystemStreamPartition to SystemStreamPartition+tasks.
   * @param jobModel The {@link JobModel} is used to determine which {@link TaskName} for all tasks provided by the each {@link SystemStreamPartitionGrouperSystemStreamPartition} maps to.
   *
 @return The list *of @implNote This method is not atomic or thread-safe. The intent is for the Samza Processor's coordinator to use this
   * method to assign the Startpoints to the appropriate tasks.
   * @param ssp The {@link SystemStreamPartition} that a {@link Startpoint} is keyed to.
   * @param grouper The {@link SystemStreamPartitionGrouper} is used to determine what the task names are.
   * @return The list of {@link TaskName}s
   */
  public Set<TaskName> groupStartpointsPerTask(SystemStreamPartition ssp, SystemStreamPartitionGrouper grouper) {...}

// Full implementation not shown for brevity
}

Changes to OffsetManager

{@link SystemStreamPartition}s that were fanned out to SystemStreamPartition+TaskName.
   */
  public Set<SystemStreamPartition> fanOutStartpointsToTasks(SystemStreamPartition ssp, SystemStreamPartitionGrouper grouper) {...}

// Full implementation not shown for brevity
}

Changes to OffsetManager

Code Block
languagejava
titleOffsetManager
// OffsetManager is written in Scala, but is translated to Java here for consistency in this design document.
public class OffsetManager {
  private final CheckpointManager checkpointManager;
  private final StartpointManager startpointManager;

  // Will add similar doc as StartpointManager#readStartpoint
  public Startpoint getStartpoint(StartpointKey startpointKey) {
    return startpointManager.readStartpoint(startpointKey);
  }

  // Will add similar doc as StartpointManager#writeStartpoint
  public Startpoint setStartpoint(StartpointKey startpointKey, Startpoint startpoint) {
    startpointManager.writeStartpoint(startpointKey, startpoint);
  }
  
  // Renamed from getStartingOffset to self document this method where the offset is coming from.
  public String getStartingOffsetFromCheckpoint(TaskName taskName, SystemStreamPartition systemStreamPartition) { 
    ... 
Code Block
languagejava
titleOffsetManager
// OffsetManager is written in Scala, but is translated to Java here for consistency in this design document.
public class OffsetManager {
  private final CheckpointManager checkpointManager;
  private final StartpointManager startpointManager;

  // Will add similar doc as StartpointManager#readStartpoint
  public Startpoint getStartpoint(StartpointKey startpointKey) {
    return startpointManager.readStartpoint(startpointKey);
  }

  // Will add similar doc as StartpointManager#writeStartpoint Alias old name for backwards-compatibility.
  @Deprecated // set as legacy and remove in next major version.
  public StartpointString setStartpointgetStartingOffset(StartpointKeyTaskName startpointKeytaskName, StartpointSystemStreamPartition startpointsystemStreamPartition) {
    startpointManager.writeStartpoint(startpointKeyreturn getStartingOffsetFromCheckpoint(taskName, startpointsystemStreamPartition);
  }
  
  //public Renamedvoid from getStartingOffset to self document this method where the offset is coming fromwriteCheckpoint(TaskName taskName, Checkpoint checkpoint) {
    ...
  public String getStartingOffsetFromCheckpointcheckpointManager.writeCheckpoint(TaskName taskName, SystemStreamPartition systemStreamPartition) { checkpoint);
    ... 
  }

  // Aliasfor oldSSPs namein for backwards-compatibility.this task
  @Deprecated // set as legacy and remove in next major version.
  public String getStartingOffset(TaskName taskName, SystemStreamPartition systemStreamPartition) {
    return getStartingOffsetFromCheckpoint(taskName, systemStreamPartition); startpointManager.deleteStartpoint(ssp, taskName);
    // 
    ...
  }

// Excluding members and methods not pertinent to this design document for brevity.
}

...

All changes are backwards compatible. This is an add on feature and no changes to the existing checkpoint operations are required.Future plan is to use the newer SystemConsumer#register(SystemStreamPartition, Startpoint) interface and deprecate the existing SystemConsumer#register(SystemStreamPartition, String). This is will allow a single point of entry into setting the starting offset on the SystemConsumer, whether the starting offset comes from a checkpoint or from a startpoint. 

Rejected Alternatives

Previous explored solutions involved modifying the checkpoint offsets directly. Operationally, the proposed solution provides more safety because checkpoints are used for fault-tolerance and should not allow user intervention to prevent human error. Having the ability to set starting offsets out-of-band provides the safety layer. 

...