Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

A requested Startpoint will be stored in a metadata store. This will be decoupled from the actual checkpoints offsets in the checkpoint stream.

...

General Workflow

Code Block
languagejava
titleStartpoint
Startpoint
/**
 * Startpoint
 */
public class StartpointEntry {
  private final String taskName;
  private final Startpoint startpoint;

// Full implementation not shown for brevity
}

/**
 * Startpoint represents a position in a stream by {@link PositionType}
 */
public class Startpoint {
  private final PositionType PositionType;
  private final String position;

// Full implementation not shown for brevity
}

public enum PositionType {
  SPECIFIC_OFFSET,
  TIMESTAMP,
  EARLIEST,
  LATEST,
  BOOTSTRAP
}

SystemConsumer Registering Startpoint

The provided SystemConsumer implementations by the Samza framework will implement the new register interface below. This is analogous to the existing overloaded #register(SystemStreamPartition, String) method used to recover from checkpoint offsets. The plan is to eventually deprecate #register(SystemStreamPartition, String) and unify the values in the Startpoint metadata store and Checkpoint stream to use the same Startpoint format.

Entities

  • SamzaProcessor - For the sake of brevity, this represents SamzaContainer, TaskInstance, CheckpointManager and everything else in the framework that is not listed here.
  • SystemConsumer
  • StartpointManager
  • StorageLayer - See Storing Requested Startpoint
  • ExternalSource - i.e. Checkpoint tool, REST backend, etc..

Objects

Step 1: StartpointManager#writeStartpoint(SSP, StartpointEntry) stores Startpoint in the storage layer. 

Step 2: ExternalSource triggers SamzaProcessor to restart

Step 3: SamzaProcessor starts up and each task instance reads Checkpoints and also reads Startpoints via StartpointManager#readStartpoint(SSP, removeUponRead)
As each SamzaProcessor task instances iterate through their SSPs:
    a) if there is a Startpoint for the requested SSP SystemConsumer#register(SystemStreamPartition, Startpoint). 
    b) else, call the existing SystemConsumer#register(SystemStreamPartition, checkpointOffset:String). Eventually, both interface methods will be unified since the interface implementations should be agnostic of where the starting offsets come from and should only be concerned with setting the starting offset, whether it comes from a Checkpoint or a Startpoint. This will be explored during implementation.

Startpoints are removed from StorageLayer at the first checkpoint offset commit. This ensures precedence is with the Startpoint until the next Checkpoint is committed
If SamzaProcessor has checkpoints disabled (*.samza.reset.offset=true), then Startpoints are removed immediately after being read.

Startpoint Model


Code Block
languagejava
titleStartpoint
Startpoint
/**
 * Startpoint
 */
public class StartpointEntry {
  private final String taskName;
  private final Startpoint startpoint;

// Full implementation not shown for brevity
}

/**
 * Startpoint represents a position in a stream by {@link PositionType}
 */
public class Startpoint {
  private final PositionType PositionType;
  private final String position;

// Full implementation not shown for brevity
}

public enum PositionType {
  SPECIFIC_OFFSET,
  TIMESTAMP,
  EARLIEST,
  LATEST,
  BOOTSTRAP
}

SystemConsumer Registering Startpoint

The provided SystemConsumer implementations by the Samza framework will implement the new register interface below. This is analogous to the existing overloaded #register(SystemStreamPartition, String) method used to recover from checkpoint offsets. The plan is to eventually deprecate #register(SystemStreamPartition, String) and unify the values in the Startpoint metadata store and Checkpoint stream to use the same Startpoint format.


Code Block
languagejava
titleSystemConsumer
public interface SystemConsumer {
  /**
   * Register a SystemStreamPartition to this SystemConsumer by an externally defined
   * {@link Startpoint}, only if one exists for this SystemStreamPartition. If none exists,
   * {@link #register(SystemStreamPartition, String)} will be called instead.
   * If a SystemConsumer does not support a particular {@link PositionType}, an exception
   * should be thrown. Otherwise, the SystemConsumer is expected to seek to the offset position.
   *
Code Block
languagejava
titleSystemConsumer
public interface SystemConsumer {
  /**
   * Register a SystemStreamPartition to this SystemConsumer by an externally defined
   * {@link Startpoint}, only if one exists for this SystemStreamPartition. If none exists,
   * {@link #register(SystemStreamPartition, String)} will be called instead.
   * If a SystemConsumer does not support a particular {@link PositionType}, an exception
   * should be thrown. Otherwise, the SystemConsumer is expected to seek to the offset position.
   *
   * @param systemStreamPartition
   *          The SystemStreamPartition object representing the Samza
   *          SystemStreamPartition to receive messages from.
   * @param startpointsystemStreamPartition
   *          {@link Startpoint)The SystemStreamPartition object representing position in the stream to startSamza
   *          reading events from.
   */
  default void register(SystemStreamPartition systemStreamPartition,to Startpointreceive startpoint)messages {from.
   * if@param (startpoint.getPositionType() != PositionType.SPECIFIC_OFFSET) {
 startpoint
   *     throw new UnsupportedException("Not implemented");
  {@link Startpoint) }
representing position in the register(systemStreamPartition, startpoint.getPosition());stream to start
  }
 
*  @Deprecated // Unify to eventually use the register methodreading aboveevents only.from.
   */
  default void register(SystemStreamPartition systemStreamPartition, String offset Startpoint startpoint) {
    if (startpoint.getPositionType() != PositionType.SPECIFIC_OFFSET) {
    ...
  }
 
// Excluding existing interface methods for brevity
}
 
// Below is pseudocode for system specific implementations on how to handle TIMESTAMP position types
 
public KafkaSystemConsumer implements SystemConsumer {
  @Override
  publicthrow new UnsupportedException("Not implemented");
    }
    register(systemStreamPartition, startpoint.getPosition());
  }
 
  @Deprecated // Unify to eventually use the register method above only.
  void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
    if (startpoint.getPositionType() == PositionType.TIMESTAMP String offset) {
    ...
  }
 
// CallExcluding underlyingexisting Kafka Consumer#offsetsForTimes().
      interface methods for brevity
}
 
// CallBelow underlyingis Kafkapseudocode Consumer#seek() with offsets returned from offsetsForTimes()
    } else if...
  }
}for system specific implementations on how to handle TIMESTAMP position types
 
public EventHubSystemConsumerKafkaSystemConsumer implements SystemConsumer {
  @Override
  public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
    if (startpoint.getPositionType() == PositionType.TIMESTAMP) {
      // eventPosition = EventPosition.fromEnqueuedTime(toDateTime(startpoint.position()))
      // eventHubClientManager.getEventHubClient().createReceiverSync(consumerGroup, systemStramPartition.getPartitionId(), eventPosition)Call underlying Kafka Consumer#offsetsForTimes().
    } else if...
  }
}

Storing Requested Startpoint

Out-of-band Metadata Store

The out-of-band metadata store used is described by the metadata store abstraction feature (SAMZA-1786) from SEP-11. The Startpoints are stored within its own namespace in the metadata store configured by `metadata.store.factory`.

Abstractly, we think of the metadata store as a KV store and Startpoints are stored as:

SSP→StartpointEntry(Startpoint, TaskName)

TaskName is optional and is primarily for broadcast inputs where the same SSP spans across multiple tasks.

StartpointManager

StartpointManager is the main API to read and write Startpoints and is composed alongside the CheckpointManager in the OffsetManager. The StartpointManager is system implementation agnostic and handles the serialization and deserialization of Startpoints into the metadata store.

 // Call underlying Kafka Consumer#seek() with offsets returned from offsetsForTimes()
    } else if...
  }
}
 
public EventHubSystemConsumer implements SystemConsumer {
  @Override
  public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
    if (startpoint.getPositionType() == PositionType.TIMESTAMP) {
      // eventPosition = EventPosition.fromEnqueuedTime(toDateTime(startpoint.position()))
      // eventHubClientManager.getEventHubClient().createReceiverSync(consumerGroup, systemStramPartition.getPartitionId(), eventPosition)
    } else if...
  }
}

Storing Requested Startpoint

Out-of-band Metadata Store

The out-of-band metadata store used is described by the metadata store abstraction feature (SAMZA-1786) from SEP-11. The Startpoints are stored within its own namespace in the metadata store configured by `metadata.store.factory`.

Abstractly, we think of the metadata store as a KV store and Startpoints are stored as:

SSP→StartpointEntry(Startpoint, TaskName)

TaskName is optional and is primarily for broadcast inputs where the same SSP spans across multiple tasks.

StartpointManager

StartpointManager is the main API to read and write Startpoints and is composed alongside the CheckpointManager in the OffsetManager. The StartpointManager is system implementation agnostic and handles the serialization and deserialization of Startpoints into the metadata store.

Code Block
languagejava
titleStartpointManager
/**
 * The StartpointManager reads and writes {@link Startpoint} to the metadata store defined by
 * the configuration task.startpoint.metadata.store.factory.
 */
public class StartpointManager {
  /**
   * Construct the StartpointManager with the {@link MetadataStore} to use. 
Code Block
languagejava
titleStartpointManager
/**
 * The StartpointManager reads and writes {@link Startpoint} to the metadata store defined by
 * the configuration task.startpoint.metadata.store.factory.
 */
public class StartpointManager {
  /**
   * Construct the StartpointManager with the {@link MetadataStore} to use. 
   */
  public StartpointManager(MetadataStore metadataStore) { ... }
  
  /**
   * Writes a {@link Startpoint} that defines the start position for stream partitions.
   * @param ssp The {@link SystemStreamPartition} to map the {@link Startpoint} against.
   * @param startpointEntry Reference to a StartpointEntry object.
   */
  public void writeStartpoint(SystemStreamPartition ssp, StartpointEntry startpointEntry) { ... }

  /**
   * Returns the last {@link Startpoint} for a specified {@link SystemStreamPartition}.
   * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
   * @param removeUponRead If true, removes the Startpoint once read.
   * @return {@link Startpoint} for the {@link SystemStreamPartition}, or null if it does not exist.
   */
  public StartpointEntry readStartpointStartpointManager(SystemStreamPartition ssp, boolean removeUponReadMetadataStore metadataStore) { ... }
  
  /**
   * Writes a {@link Startpoint} that defines the start position for stream partitions.
   ** @param Returnsssp allThe {@link StartpointSystemStreamPartition} to inmap the {@link MetadataStoreStartpoint} against.
   * @return@param mapstartpointEntry ofReference existingto {@linka SystemStreamPartition} to {@link Startpoint}StartpointEntry object.
   */
  public Map<SystemStreamPartition void writeStartpoint(SystemStreamPartition ssp, StartpointEntry>StartpointEntry readAllStartpoints(startpointEntry) { ... }

// Full implementation not shown for brevity
}

Changes to OffsetManager

Code Block
languagejava
titleOffsetManager
// OffsetManager is written in Scala, but is translated to Java here for consistency in this design document.
public class OffsetManager {
  private final CheckpointManager checkpointManager;
  private final StartpointManager startpointManager;

  // Will add similar doc as StartpointManager#readStartpoint
  public Startpoint getStartpoint(StartpointKey startpointKey) {
    return startpointManager.readStartpoint(startpointKey);
  }

  // Will add similar doc as StartpointManager#writeStartpoint
  public Startpoint setStartpoint(StartpointKey startpointKey, Startpoint startpoint) {
    startpointManager.writeStartpoint(startpointKey, startpoint);
  }
  
  // Renamed from getStartingOffset to self document this method where the offset is coming from.
  public String getStartingOffsetFromCheckpoint(TaskName taskName, SystemStreamPartition systemStreamPartition) { 
    ... 
  }

  // Alias old name for backwards-compatibility.
  @Deprecated // set as legacy and remove in next major version.
  public String getStartingOffset(TaskName taskName, SystemStreamPartition systemStreamPartition) {
    return getStartingOffsetFromCheckpoint(taskName, systemStreamPartition);
  }

// Excluding members and methods not pertinent to this design document for brevity.
}

General Workflow

Entities

  • SamzaProcessor - For the sake of brevity, this represents SamzaContainer, TaskInstance, CheckpointManager and everything else in the framework that is not listed here.
  • SystemConsumer
  • StartpointManager
  • StorageLayer - See Storing Requested Startpoint
  • ExternalSource - i.e. Checkpoint tool, REST backend, etc..

Objects

  • Startpoint
  • Checkpoint
  • SystemStreamPartition(SSP)
  • StartpointKey

...



  /**
   * Returns the last {@link Startpoint} for a specified {@link SystemStreamPartition}.
   * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
   * @param removeUponRead If true, removes the Startpoint once read.
   * @return {@link Startpoint} for the {@link SystemStreamPartition}, or null if it does not exist.
   */
  public StartpointEntry readStartpoint(SystemStreamPartition ssp, boolean removeUponRead) { ... }

  /**
   * Returns all {@link Startpoint} in the {@link MetadataStore}.
   * @return map of existing {@link SystemStreamPartition} to {@link Startpoint}.
   */
  public Map<SystemStreamPartition, StartpointEntry> readAllStartpoints() { ... }

// Full implementation not shown for brevity
}

Changes to OffsetManager

Code Block
languagejava
titleOffsetManager
// OffsetManager is written in Scala, but is translated to Java here for consistency in this design document.
public class OffsetManager {
  private final CheckpointManager checkpointManager;
  private final StartpointManager startpointManager;

  // Will add similar doc as StartpointManager#readStartpoint
  public Startpoint getStartpoint(StartpointKey startpointKey) {
    return startpointManager.readStartpoint(startpointKey);
  }

  // Will add similar doc as StartpointManager#writeStartpoint
  public Startpoint setStartpoint(StartpointKey startpointKey, Startpoint startpoint) {
    startpointManager.writeStartpoint(startpointKey, startpoint);
  }
  
  // Renamed from getStartingOffset to self document this method where the offset is coming from.
  public String getStartingOffsetFromCheckpoint(TaskName taskName, SystemStreamPartition systemStreamPartition) { 
    ... 
  }

  // Alias old name for backwards-compatibility.
  @Deprecated // set as legacy and remove in next major version.
  public String getStartingOffset(TaskName taskName, SystemStreamPartition systemStreamPartition) {
    return getStartingOffsetFromCheckpoint(taskName, systemStreamPartition);
  }

// Excluding members and methods not pertinent to this design document for brevity.
}


Compatibility, Deprecation, and Migration Plan

...