Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In the current Samza framework, manually setting the starting offsets for an input stream requires stopping the Samza processor and using a system-specific checkpoint tool to modify the checkpoint offsets directly in the checkpoint stream. Using the current tooling is tedious and error prone, as it requires proper security access and potentially editing many offsets in a configuration file. In some cases, it can cause a Samza processor to lose its checkpoints. In addition to the dangerous nature of modifying checkpoints directly, the checkpoint offsets are arbitrary strings with system-specific formats. This requires a different checkpoint tool for each system type (i.e. Kafka, Eventhub, Kinesis, etc...).

Terminology

SSP - System stream partition. For example, on a Kafka stream called SomeEvent in the tracking cluster, the system is tracking, the stream is SomeEvent and the partition is a partition ID in the stream.

JC - Job coordinator.

Requirements

Goals

Provide a simple and generic interface to manipulate starting offsets per input stream partition instead of relying on system-specific checkpoint tools and services. This allows flexibility to build REST API layers and tools on top of the common interface.

Allow manipulating defining starting offsets on an input stream not only by specific offsets, but with different position types, such as by timestampstream by SSP across all tasks or SSP per task. n

Framework level support for various offset types such as, specific offsets and timestamp-based offsets.

Maintain backwards compatibility.

...

Simplicity. Easy for developers and users to useto create tools and services to set starting offsets of a given Samza job.

Non-goals

Rewinding or fast-fowarding state store changelogs in relation to the input streams. The challenge is that the changelog stream is typically log compacted.

...

Different systems in Samza have different formats for checkpoint offsets and lacks lack any contract that describes the offset format. To maintain backwards compatibility and to have better operability for setting starting offsets, this solution takes introduces the approach of defining the concept of Startpoints and utilizing a utilizes the abstract metadata storage layer.

The A Startpoint indicates what offset position a particular SystemStreamPartition SSP should start consuming from. The Startpoint takes higher precedence than Checkpoints and defines the position type and the position value of the position type. For example, if the Startpoint position type is TIMESTAMP, then the position value is an epoch value in milliseconds. The Startpoint enforces a stricter contract for external tools and services to follow as opposed to the string offset value in the Checkpoint.

Each SystemConsumer will also implement a new register method to handle a Startpoint.

A A requested Startpoint will be stored in a the metadata store. This will be decoupled from the actual checkpoints checkpointed offsets in the checkpoint stream.

General Workflow

Entities

  • SamzaProcessor - For the sake of brevity, this represents SamzaContainer, TaskInstance, CheckpointManager and everything else in the framework that is not listed here.
  • JobCoordinator
  • SystemConsumer (implements interface StartpointVisitor)
  • StartpointManager
  • StorageLayer - See Storing Requested Startpoint
  • ExternalSource - i.e. Checkpoint tool, REST backend, etc..

Objects

  • Startpoint
  • Checkpoint
  • SystemStreamPartition(SSP)

...

Loading Startpoints Upon Job Start

Startpoints are written to the metadata store using two key types: SSP-only and SSP+TaskName. For broadcast input streams, an SSP may span across multiple tasks and therefore, Startpoints are applied at the task level. For Startpoints on SSP-only keys, the JC will have a mechanism to fan out the SSP across all tasks that the SSP maps to. The below diagram illustrates the flow.

Image Added

Committing Startpoints

Once a particular Startpoint is applied to the starting offset of a system-stream-partition in a task instance, it is subsequently removed at the next offset commit. 

Image Added

Startpoint Models


Code Block
languagejava
titleStartpoint
WIP

StartpointVisitor


Code Block
languagejava
titleSystemConsumer
WIP

Storing Requested Startpoint

Metadata Store

The out-of-band metadata store used is described by the metadata store abstraction feature (SAMZA-1786) from SEP-11. The Startpoints are stored within its own namespace in the metadata store.

Abstractly from the perspective of Startpoint operation, we think of the metadata store as a KV store and keyed by:

SSP→Startpoint or SSP+TaskName→Startpoint

TaskName is optional and is primarily for broadcast inputs where the same SSP spans across multiple tasks. However, when a job starts, all Startpoints keyed only by SSP will be remapped to SSP+TaskName. See General Workflow above for details.

StartpointManager

StartpointManager is the main API to read and write Startpoints and is composed alongside the CheckpointManager in the OffsetManager. The StartpointManager is system implementation agnostic and handles the serialization and deserialization of Startpoints into the metadata store.

Code Block
languagejava
titleStartpointManager
WIP

OffsetManager Augmentation

Code Block
languagejava
titleOffsetManager
WIP


Compatibility, Deprecation, and Migration Plan

All changes are backwards compatible. This is an add on feature and no changes to the existing checkpoint operations are required.

There may be opportunities where offset related APIs that are strongly coupled to Checkpoints may be modified to handle both Startpoints and Checkpoints. Any such APIs will be deprecated until the next major version. 

No migration needed for this new feature.

Test Plan

Create a test job that logs the offset of the first event consumed per task and per SSP. Test using all Startpoint types.

Use the test job to test all combinations of:

  • Job Coordinators (ex: ClusterbasedJobCoordinator, ZkJobCoordinator, etc..)
  • Provided connectors (ex: Kafka, Eventhubs, etc..)
  • Broadcast and non-broadcast streams
  • Various input streams with various partition counts

Step 3: Upon startup, the JobCoordinator calls StartpointManager#fanOutStartpointsToTasks(SystemStreamPartitionGrouper) to remap Startpoints keyed only by SSP to be re-keyed by SSP+TaskName. Startpoints already keyed by SSP+TaskName are not overwritten. This remapping is required because once a Startpoint is consumed, it will need to be deleted at the next Checkpoint commit (See Note below). Checkpoint commits happen at a per task level.

Step 3: SamzaProcessor starts up and each task instance reads Checkpoints and also reads Startpoints via StartpointManager#readStartpointForTask(SSP, TaskName)
As each of the SamzaProcessor task instances iterate through their SSPs:
    a) if there is a Startpoint for the requested SSP, call the StartpointVisitor#visit(SystemStreamPartition, Startpoint) interface implementated by a SystemConsumer
    b) else, call the existing SystemConsumer#register(SystemStreamPartition, checkpointOffset:String). Eventually, both interface methods will be unified since the interface implementations should be agnostic of where the starting offsets come from and should only be concerned with setting the starting offset, whether it comes from a Checkpoint or a Startpoint. This will be explored during implementation.

Note: Startpoints are removed from StorageLayer at the first checkpoint offset commit. This ensures precedence is with the Startpoint until the next Checkpoint is committed, specifically in situations where the SamzaProcessor restarts before the commit. 

Startpoint Models

Code Block
languagejava
titleStartpoint
/**
 * Startpoint represents a position in a stream partition
 */
public abstract class Startpoint {
  private final long creationTimestamp;

  /**
   * Apply the visitor {@link StartpointVisitor}'s register methods to the instance of this {@link Startpoint}
   * class.
   * @param systemStreamPartition The {@link SystemStreamPartition} needed to register with the {@link StartpointVisitor}
   * @param startpointVisitor The visitor to register with.
   */
  public abstract void apply(SystemStreamPartition systemStreamPartition, StartpointVisitor startpointVisitor);
  
// Full implementation not shown for brevity
}


/**
 * A {@link Startpoint} that represents a specific offset in a stream partition.
 */
public final class StartpointSpecific extends Startpoint { ... }


/**
 * A {@link Startpoint} that represents a timestamp offset in a stream partition.
 */
public final class StartpointTimestamp extends Startpoint { ... }


/**
 * A {@link Startpoint} that represents the earliest offset in a stream partition.
 */
public final class StartpointOldest extends Startpoint { ... }


/**
 * A {@link Startpoint} that represents the latest offset in a stream partition.
 */
public final class StartpointUpcoming extends Startpoint { ... }


/**
 * A {@link Startpoint} that represents a custom startpoint. This is for systems that have a non-generic option
 * for setting offsets. Startpoints are serialized to JSON in the {@link org.apache.samza.metadatastore.MetadataStore}
 * and it is recommended to maintain the subclass of this {@link StartpointCustom} as a simple POJO.
 */
public abstract class StartpointCustom extends Startpoint { ... }
 ...

StartpointVisitor

Code Block
languagejava
titleSystemConsumer
/**
 * Visitor interface for system consumers to implement to support {@link Startpoint}s.
 */
public interface StartpointVisitor {

  /**
   * Seek to specific offset represented by {@link StartpointSpecific}
   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
   * @param startpointSpecific The {@link Startpoint} that represents the specific offset.
   */
  void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific);

  /**
   * Seek to timestamp offset represented by {@link StartpointTimestamp}
   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
   * @param startpointTimestamp The {@link Startpoint} that represents the timestamp offset.
   */
  default void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) {
    throw new UnsupportedOperationException("StartpointTimestamp is not supported.");
  }

  /**
   * Seek to earliest offset represented by {@link StartpointOldest}
   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
   * @param startpointOldest The {@link Startpoint} that represents the earliest offset.
   */
  default void visit(SystemStreamPartition systemStreamPartition, StartpointOldest startpointOldest) {
    throw new UnsupportedOperationException("StartpointOldest is not supported.");
  }

  /**
   * Seek to latest offset represented by {@link StartpointUpcoming}
   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
   * @param startpointUpcoming The {@link Startpoint} that represents the latest offset.
   */
  default void visit(SystemStreamPartition systemStreamPartition, StartpointUpcoming startpointUpcoming) {
    throw new UnsupportedOperationException("StartpointUpcoming is not supported.");
  }

  /**
   * Bootstrap signal represented by {@link StartpointCustom}
   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
   * @param startpointCustom The {@link Startpoint} that represents the bootstrap signal.
   */
  default void visit(SystemStreamPartition systemStreamPartition, StartpointCustom startpointCustom) {
    throw new UnsupportedOperationException(String.format("%s is not supported.", startpointCustom.getClass().getSimpleName()));
  }
}
 
// Below is example pseudocode for system specific implementations on how to handle timestamp position types. Other visitor interface methods,
// (StartpointSpecific, StartpointOldest, StartpointUpcoming) left out for brevity.
 
public KafkaSystemConsumer implements SystemConsumer, StartpointVisitor {
  @Override
  public void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpoint) {
    // Call underlying Kafka Consumer#offsetsForTimes()
    // Call underlying Kafka Consumer#seek() with offsets returned from offsetsForTimes()
  }
}
 
public EventHubSystemConsumer implements SystemConsumer, StartpointVisitor {
  @Override
  public void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpoint) {
    // eventPosition = EventPosition.fromEnqueuedTime(toDateTime(startpoint.position()))
    // eventHubClientManager.getEventHubClient().createReceiverSync(consumerGroup, systemStramPartition.getPartitionId(), eventPosition)
  }
}

Storing Requested Startpoint

Out-of-band Metadata Store

The out-of-band metadata store used is described by the metadata store abstraction feature (SAMZA-1786) from SEP-11. The Startpoints are stored within its own namespace in the metadata store configured by `metadata.store.factory`.

Abstractly, we think of the metadata store as a KV store and Startpoints are stored as:

SSP→Startpoint or SSP+TaskName→Startpoint

TaskName is optional and is primarily for broadcast inputs where the same SSP spans across multiple tasks. However, when a job starts, all Startpoints keyed only by SSP will be remapped to SSP+TaskName. See General Workflow above for details.

StartpointManager

StartpointManager is the main API to read and write Startpoints and is composed alongside the CheckpointManager in the OffsetManager. The StartpointManager is system implementation agnostic and handles the serialization and deserialization of Startpoints into the metadata store.

Code Block
languagejava
titleStartpointManager
/**
 * The StartpointManager reads and writes {@link Startpoint} to the metadata store defined by
 * the configuration task.startpoint.metadata.store.factory.
 */
public class StartpointManager {
  /**
   * Writes a {@link Startpoint} that defines the start position for a {@link SystemStreamPartition}.
   *
   * @param ssp The {@link SystemStreamPartition} to map the {@link Startpoint} against.
   * @param startpoint Reference to a Startpoint object.
   */
  public void writeStartpoint(SystemStreamPartition ssp, Startpoint startpoint) {...}


  /**
   * Writes a {@link Startpoint} that defines the start position for a specified {@link SystemStreamPartition} for a specific {@link TaskName}.
   *
   * @param ssp The {@link SystemStreamPartition} to map the {@link Startpoint} against.
   * @param taskName The {@link TaskName} to map the {@link Startpoint} against.
   * @param startpoint Reference to a Startpoint object.
   */
  public void writeStartpoint(SystemStreamPartition ssp, TaskName taskName, Startpoint startpoint) {...}

  /**
   * Returns the last {@link Startpoint} that defines the start position for a {@link SystemStreamPartition}.
   *
   * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
   * @return {@link Startpoint} for the {@link SystemStreamPartition}, or null if it does not exist.
   */
  public Startpoint readStartpoint(SystemStreamPartition ssp) {...}

  /**
   * Returns the last {@link Startpoint} for a specified {@link SystemStreamPartition} for a specific {@link TaskName}.
   *
   * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
   * @param taskName The {@link TaskName} to map the {@link Startpoint} against.
   * @return {@link Startpoint} for the {@link SystemStreamPartition}, or null if it does not exist.
   */
  public Startpoint readStartpoint(SystemStreamPartition ssp, TaskName taskName) {...}

  /**
   * Deletes the {@link Startpoint} for the specified {@link SystemStreamPartition}
   *
   * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
   */
  public void deleteStartpoint(SystemStreamPartition ssp) {...}

  /**
   * Deletes the {@link Startpoint} for the specified {@link SystemStreamPartition} for a specific {@link TaskName}.
   *
   * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
   * @param taskName The {@link TaskName} to map the {@link Startpoint} against.
   */
  public void deleteStartpoint(SystemStreamPartition ssp, TaskName taskName) {...}

  /**
   * For {@link Startpoint}s keyed only by {@link SystemStreamPartition}, this method re-maps the Startpoints from
   * SystemStreamPartition to SystemStreamPartition+{@link TaskName} for all tasks provided by the {@link JobModel}
   * This method is not atomic or thread-safe. The intent is for the Samza Processor's coordinator to use this
   * method to assign the Startpoints to the appropriate tasks.
   * @param jobModel The {@link JobModel} is used to determine which {@link TaskName} each {@link SystemStreamPartition} maps to.
   * @return The list of {@link SystemStreamPartition}s that were fanned out to SystemStreamPartition+TaskName.
   */
  public Set<SystemStreamPartition> fanOutStartpointsToTasks(SystemStreamPartition ssp, SystemStreamPartitionGrouper grouper) {...}

// Full implementation not shown for brevity
}

Changes to OffsetManager

Code Block
languagejava
titleOffsetManager
// OffsetManager is written in Scala, but is translated to Java here for consistency in this design document.
public class OffsetManager {
  private final CheckpointManager checkpointManager;
  private final StartpointManager startpointManager;

  // Will add similar doc as StartpointManager#readStartpoint
  public Startpoint getStartpoint(StartpointKey startpointKey) {
    return startpointManager.readStartpoint(startpointKey);
  }

  // Will add similar doc as StartpointManager#writeStartpoint
  public Startpoint setStartpoint(StartpointKey startpointKey, Startpoint startpoint) {
    startpointManager.writeStartpoint(startpointKey, startpoint);
  }
  
  // Renamed from getStartingOffset to self document this method where the offset is coming from.
  public String getStartingOffsetFromCheckpoint(TaskName taskName, SystemStreamPartition systemStreamPartition) { 
    ... 
  }

  // Alias old name for backwards-compatibility.
  @Deprecated // set as legacy and remove in next major version.
  public String getStartingOffset(TaskName taskName, SystemStreamPartition systemStreamPartition) {
    return getStartingOffsetFromCheckpoint(taskName, systemStreamPartition);
  }

  public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint) {
    ...
    checkpointManager.writeCheckpoint(taskName, checkpoint);
    ...
    // for SSPs in this task
       startpointManager.deleteStartpoint(ssp, taskName);
    // 
    ...
  }
// Excluding members and methods not pertinent to this design document for brevity.
}

...

All changes are backwards compatible. This is an add on feature and no changes to the existing checkpoint operations are required.

Test Plan

...

Analysis

A key part of the core Startpoint feature is for individual task instances to fetch the appropriate Startpoint for an SSP when a Startpoint is stored in the Metadata Store without any association to a specific task. The fantask instances to fetch the appropriate Startpoint keyed by SSP-only. The two approaches, fan-out and intent-ACK approaches have , have been explored with the analysis detailed in the following subsections. The fan-out strategy is favored over the intent-ACK strategy. See analysis and explanation below.

Fan-out

Loading Startpoints

Image Removed

Committing Startpoints

Image Removed

See General Workflow above for details


Pros

  • Follows the natural progression of the JobCoordinator calculating the job model and then applying the info in the job model to fan out the SSP to SSP-Task startpointsStartpoints.
  • Cleaner and simpler book keeping of startpointsStartpoints. SSP-only keyed startpoints Startpoints are deleted after fan out and SSP+TaskName keyed startpoints Startpoints are deleted upon checkpoint commitoffset commits.

Cons

  • May Will not work with a custom grouper combined with the Passthrough Job Coordinator. The default grouper for Passthrough does not support broadcast streams, so the SSP fanout to tasks across containers will be mutually exclusive, but that may not apply to custom groupers.

Intent-ACK

...

  • the PassthroughJobCoordinator because it does not have a leader election strategy. The fan-out approach requires a JC leader. Workarounds will need to be explored during implementation for the small pool of use cases where PassthroughJobCoordinator is needed.

Intent-ACK

This approach is rejected due to the weight of the cons and the complexity required to manage the intents and ACKs once they are applied to the starting offsets and no longer needed.

Loading Startpoints Upon Job Start

Committing Startpoints

Pros

  • Does not rely on Job Coordinator for consumption of startpointsStartpoints.

Cons

  • Following the consumption of startpoints, metadata store will be polluted with SSP-only keyed startpoints and per-task-ACKs. Current implementation of the metadata store will bootstrap all the startpoints and ACKs until they are cleaned up.
  • Task count may change during runtime due to the increase in the input stream partition count. Therefore, new tasks will pick up the Startpoint unless all tasks prior to the increase in task count have written their ACKs and the cleanup process ran prior to the task-to-partition mapping. Startpoints should only apply to the state of the job upon startup and not to any changes during runtime. 

Additional Notes

Following is the workflow for the Intent/ACK approach:

1. The startpoint is written to the startpoint store. 
2. The SamzaContainer's read the startpoint and acknowledge that they've acted on it. 
3. After sufficient number of acks are received, an external agent purge the startpoint. External agent can be either one of the running SamzaContainer of the job or JobCoordinator of the job or a external daemon process.

In above workflow, an external agent will wait for the expected number of ACKs to be published to startpoint-store, and then purge the startpoint. The expected number of ACK's is not a static number. 

The container can acknowledge the startpoint at either one of the following two levels of granularity:

1. Container-level: Currently, compared to YARN with static container count(defined by job.container.count), the number of containers in standalone is dynamic. Processors can join and leave the group at any point in time. For standalone, an external agent cannot determine if it had received sufficient number of ACK's with certainty since number of containers is a moving number. 

 Standalone :

Let's consider the following scenario for standalone application. 

   A. Four processors are part of standalone application when the external agent watches for number of active processors in standalone application.
   B. External agent expects four ACK's to be written to the store before it can purge the startpoint. 
   C. Before any processor acks the startpoint, due to hardware failure, number of active processors gets reduced to three and a re-balance happens. 
   D. After re-balance, three live processors acks their corresponding startpoint to the store and are running fine. External agent cannot clear the start-point, since there're only three ACK's when compared to expected four. 

   Unless we change the coordination protocol in standalone to accommodate start-point, I'm not sure if we can ACK at this level to purge startpoints.

 YARN :

There has been interesting recent developments, where we're planning to develop auto-scaling solution for YARN. In the immediate future, the number of containers in YARN will be not fixed and the above scenario described for standalone will apply for YARN as well. 

2. Task-level: The number of tasks can change dynamically at run-time either due to increase in partition count of any of the input topics of the job or a new topic has matched the user-defined input topic-regex of the job. The above scenario described for standalone is still applicable here.


Rejected Alternatives

Writing Directly to Checkpoint

Previous explored solutions involved modifying the checkpoint offsets directly. Operationally, the proposed Startpoint solution provides more safety because checkpoints are used for fault-tolerance and . To prevent human error, the framework should not allow user intervention to prevent human error. Having the an external source to manipulate the checkpointed offsets. The ability to set starting offsets out-of-band as Startpoints is designed to do, provides the additional safety layer. 

Startpoint Intent-ACK Model

See Analysis section for details on why the fan-out model is favored.