Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: UNDER DISCUSSION and WIPACCEPTED

Votehttp://mail-archives.apache.org/mod_mbox/samza-dev/201909.mbox/browser

Discussion threadhttp://mail-archives.apache.org/mod_mbox/samza-dev/201812.mbox/%3CCABpE9c1_N4hFBsj7Yr5v_7q89SVwnm6Zr-MGuLz1e%3De8MchLkg%40mail.gmail.com%3E

JIRA

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keySAMZA-1983

Authors: Daniel Nimishura, Shanthoosh Venkataraman

Released: Samza-1.4Released: TBD

Table of Contents
excludeStatus

...

To provide a common interface for external tools and services to rewind and fast-forward the starting offsets of any input stream of a samza jobapplication. This feature will provide the capabilities to manually manipulate the starting offsets by various position types and not only by specific offsets. Many of the current underlying system consumers support different position types for seeking to an offset on an input stream, such as seeks by timestamp, and are not generically exposed by the current framework.

...

If a container of a samza application fails, upon restart it should resume processing events where the failed container had left off. In order to enable this, a samza container periodically checkpoints the current offset for each partition of an input stream. In case of application failures, samza users would want their application to consume from a particular position of a input stream(for instance, either due to a bug in their application or due to a bug in upstream producer of the pipeline). The workflow currently supported in samza to update checkpoints:1. Users

  1.  Users have to manually stop their running samza application.

...

  1. Create a configuration file in XML format and specify the starting offset for each input topic partition.

...

  1. Run the samza-checkpoint tool which updates the checkpoint topic of the samza application with the new user-defined offsets.

...

  1. Users have to manually start their samza application again.

Using the samza-checkpoint tool is tedious and error prone, as it requires proper security access and potentially editing many offsets in a configuration file. In some cases, it can cause a samza container to lose its checkpoints. In addition to the dangerous nature of modifying checkpoints directly, the checkpoint offsets are arbitrary strings with system-specific formats. This requires a different checkpoint tool for each system type (i.e. Kafka, Eventhub, Kinesis, etc...).

...

JC - Job coordinator.

Requirements

Goals

  1. Provide a simple and generic interface to manipulate starting offsets per input stream partition instead of relying on system-specific checkpoint tools and services. This allows flexibility to build REST API layers and tools on top of the common interface.
  2. Allow defining starting offsets on an input stream by SSP across all tasks or SSP per task.

...

  1. Framework level support for various offset types such as

...

  1. specific offsets and timestamp-based offsets.

Maintain backwards compatibility.

...

  1. Should support the different deployment models of samza viz standalone and yarn. Different API offerings of samza such as beam, SQL, high-level and low-level API should be supported by the solution. 
  2. Simplicity. Easy for developers and users to create tools and services to set starting offsets of a given Samza job.

Non-goals

  1. Rewinding or fast-fowarding state store changelogs in relation to the input streams. The challenge is that the changelog stream is typically log compacted.
  2. Providing a service that externally exposes a Startpoint API. Such services require other core changes and will be explored in another SEP or design document.

Proposed

...

Changes

Different systems in Samza have different formats for checkpoint offsets and lack any contract that describes the offset format. To maintain backwards compatibility and to have better operability for setting starting offsets, this solution introduces the concept of Startpoints and utilizes the abstract metadata storage layer.

...

A requested Startpoint will be stored in the metadata store. This will be decoupled from the checkpointed offsets in the checkpoint stream.

General Workflow

Loading Startpoints Upon Job

...

Startup

Startpoints are written to the metadata store under a readWrite namespace using two key types: SSP-only and SSP+TaskName. For broadcast input streams, an SSP may span across multiple tasks and therefore, Startpoints are applied at the task level. For Startpoints on SSP-only keys, the JC will have a mechanism to fan out the SSP across all tasks that the SSP maps totask to SSP mappings are retrieved from the JobModel. Upon the subsequent start of the Samza job, the Startpoints are fanned out to a fanOut namespace in the metadata store. The below diagram illustrates the flow.

As with Checkpoints, Startpoints are applied to the starting offset of an SSP in a task instance during the start up time of the SamzaContainer.

Image RemovedImage Added

Committing Startpoints

Once a particular Startpoint is applied to the starting offset of a system-stream-partition SSP in a task instance, it is subsequently removed at the next offset commit. 

Image Removed

Failure Scenarios

SamzaContainer is restarted before the first offset commit

  • The Startpoint will be applied to the starting offset again upon restart. However, this is identical to the same failure scenario for Checkpoints.

JobCoordinator is restarted before all Startpoints are applied

  • Similar to the above failure scenario, except across multiple containers since restarting a JobCoordinator restarts all associated SamzaContainers.

Startpoint Models

Image Added

Storing Requested Startpoint

Metadata Store

Referenced in the General Workflow above.

The out-of-band metadata store used is described by the metadata store abstraction feature (SAMZA-1786) from SEP-11. The Startpoints are stored within its own namespaces in the metadata store.

Abstractly from the perspective of Startpoint operation, we think of the metadata store as a KV store and mapped as such:

{SSP→Startpoint} or {SSP+TaskName→Startpoint}

TaskName is optional and is primarily for broadcast inputs where the same SSP spans across multiple tasks.

Upon the fan out, the Startpoints are mapped by task with the same structure as checkpoints:

{TaskName→{SSP→Startpoint}}  

Failure Scenarios

SamzaContainer is restarted before the first offset commit

  • The Startpoint will be applied to the starting offset again upon restart. However, this is identical to the same failure scenario for Checkpoints.

JobCoordinator is restarted before all Startpoints are applied

  • Similar to the above failure scenario, except across multiple containers since restarting a JobCoordinator restarts all associated SamzaContainers.

Public Interfaces

Startpoint Models

Code Block
languagejava
titleStartpoint
/**
 * Startpoint represents a position in a stream partition.
 */
public abstract class Startpoint 
Code Block
languagejava
titleStartpoint
/**
 * Startpoint represents a position in a stream partition.
 */
public abstract class Startpoint {

  private final long creationTimestamp;

  /**
   * Applies the {@link StartpointVisitor}'s visit methods to the {@link Startpoint}
   * and returns the result of that operation.
   * @param input the metadata associated with the startpoint.
   * @param startpointVisitor the visitor of the startpoint.
   * @return the result of applying the visitor on startpoint.
   */
  public abstract <IN, OUT> OUT apply(IN input, StartpointVisitor<IN, OUT> startpointVisitor);
}


/**
 * A {@link Startpoint} that represents the earliest offset in a stream partition.
 */
public final class StartpointOldest extends Startpoint {

  @Override
  public <IN, OUT> OUT apply(IN input, StartpointVisitor<IN, OUT> startpointVisitor) {
    return startpointVisitor.visit(input, this);
  }
}

/**
 * A {@link Startpoint} that represents the latest offset in a stream partition.
 */
public final class StartpointUpcoming extends Startpoint {

  @Override
  public <IN, OUT> OUT apply(IN input, StartpointVisitor<IN, OUT> startpointVisitor) {
    return startpointVisitor.visit(input, this);
  }
}

/**
 * A {@link Startpoint} that represents a timestamp offset in a stream partition.
 */
public final class StartpointTimestamp extends Startpoint {

  private final Long timestampOffset;
}

/**
 * A {@link Startpoint} that represents a specific offset in a stream partition.
 */
public final class StartpointSpecific extends Startpoint {

  private final String specificOffset;


  /**
   * Getter for the specific offset.
   * @return the specific offset.
   */
  public String getSpecificOffset() {
    return specificOffset;
  }

  @Override
  public <IN, OUT> OUT apply(IN input, StartpointVisitor<IN, OUT> startpointVisitor) {
    return startpointVisitor.visit(input, this);
  }
}

...

Additional Interface Method for SystemAdmin

Referred to in Step 7 of the Loading Startpoints Upon Job Startup section above.

Code Block
languagejava
titleSystemConsumerSystemAdmin
public interface SystemAdmin {
...
  /**
 * A {@link* SystemAdmin}Resolves implementationthe shouldstartpoint implementto thisa abstractionsystem tospecific supportoffset.
 {@link Startpoint}.
 */
public interface@param StartpointVisitor<IN,startpoint OUT>represents {

  /**the startpoint.
   * Performs@param asystemStreamPartition sequence of operations using represents the {@linksystem IN} and {@link StartpointSpecific} and returns the result of the executionstream partition.
   * @param@return input the input metadata about the startpointresolved offset.
   */
 @param startpointSpecificString the {@linkresolveStartpointToOffset(SystemStreamPartition systemStreamPartition, Startpoint startpoint);
...
}

StartpointVisitor

Code Block
languagejava
titleSystemConsumer
/**
 * A {@link SystemAdmin} implementation should implement this abstraction to support {@link Startpoint} and the instance
 * should visit via {@link Startpoint#apply(Object, StartpointVisitor)} within the 
 * {@link SystemAdmin#resolveStartpointToOffset(SystemStreamPartition, Startpoint)} implementation.
 */
public interface StartpointVisitor<IN, OUT> {} that represents the specific offset.
   * @return the result of executing the operations defined by the visit method.
   */
  default OUT visit(IN input, StartpointSpecific startpointSpecific) {
    throw new UnsupportedOperationException("StartpointSpecific is not supported.");
  }

  /**
   * Performs a sequence of operations using the {@link IN} and {@link StartpointTimestampStartpointSpecific} and returns the result of the execution.
   * @param input the input metadata about the startpoint.
   * @param startpointTimestampstartpointSpecific the {@link Startpoint} that represents the timestampspecific offset.
   * @return the result of executing the operations defined by the visit method.
   */
  default OUT visit(IN input, StartpointTimestampStartpointSpecific startpointTimestampstartpointSpecific) {
    throw new UnsupportedOperationException("StartpointTimestampStartpointSpecific is not supported.");
  }

  /**
   * Performs a sequence of operations using the {@link IN} and {@link StartpointOldestStartpointTimestamp} and returns the result of the execution.
   * @param input the input metadata about the startpoint.
   * @param startpointOldeststartpointTimestamp the {@link Startpoint} that represents the earliest offsettimestamp.
   * @return the result of executing the operations defined by the visit method.
   */
  default OUT visit(IN input, StartpointOldestStartpointTimestamp startpointOldeststartpointTimestamp) {
    throw new UnsupportedOperationException("StartpointOldestStartpointTimestamp is not supported.");
  }

  /**
   * Performs a sequence of operations using the {@link IN} and {@link StartpointUpcomingStartpointOldest} and returns the result of the execution.
   * @param input the input metadata about the startpoint.
   * @param startpointUpcomingstartpointOldest the {@link Startpoint} that represents the latestearliest offset.
   * @return the result of executing the operations defined by the visit method.
   */
  default OUT visit(IN input, StartpointUpcomingStartpointOldest startpointUpcomingstartpointOldest) {
    throw new UnsupportedOperationException("StartpointUpcomingStartpointOldest is not supported.");
  }
}

Storing Requested Startpoint

Metadata Store

The out-of-band metadata store used is described by the metadata store abstraction feature (SAMZA-1786) from SEP-11. The Startpoints are stored within its own namespace in the metadata store.

Abstractly from the perspective of Startpoint operation, we think of the metadata store as a KV store and keyed by:

SSP→Startpoint or SSP+TaskName→Startpoint

TaskName is optional and is primarily for broadcast inputs where the same SSP spans across multiple tasks. However, when a job starts, all Startpoints keyed only by SSP will be fanned out to SSP+TaskName. See General Workflow above for details.

StartpointManager



  /**
   * Performs a sequence of operations using the {@link IN} and {@link StartpointUpcoming} and returns the result of the execution.
   * @param input the input metadata about the startpoint.
   * @param startpointUpcoming the {@link Startpoint} that represents the latest offset.
   * @return the result of executing the operations defined by the visit method.
   */
  default OUT visit(IN input, StartpointUpcoming startpointUpcoming) {
    throw new UnsupportedOperationException("StartpointUpcoming is not supported.");
  }
}

StartpointManager

StartpointManager is the main API to read and write Startpoints and is composed alongside the CheckpointManager in the OffsetManager. It also provides the methods to fan out the Startpoints. The StartpointManager is system-StartpointManager is the main API to read and write Startpoints and is composed alongside the CheckpointManager in the OffsetManager. The StartpointManager is system implementation agnostic and handles the serialization and deserialization of Startpoints into the metadata store.

Code Block
languagejava
titleStartpointManager
WIP

OffsetManager Augmentation

OffsetManager
/**
 * The StartpointManager reads and writes {@link Startpoint} to the provided {@link MetadataStore}
 *
 * The intention for the StartpointManager is to maintain a strong contract between the caller
 * and how Startpoints are stored in the underlying MetadataStore.
 *
 * Startpoints are written in the MetadataStore using keys of two different formats:
 * 1) {@link SystemStreamPartition} only
 * 2) A combination of {@link SystemStreamPartition} and {@link TaskName}
 *
 * Startpoints are then fanned out to a fan out namespace in the MetadataStore by the
 * {@link org.apache.samza.clustermanager.ClusterBasedJobCoordinator} or the standalone
 * {@link org.apache.samza.coordinator.JobCoordinator} upon startup and the
 * {@link org.apache.samza.checkpoint.OffsetManager} gets the fan outs to set the starting offsets per task and per
 * {@link SystemStreamPartition}. The fan outs are deleted once the offsets are committed to the checkpoint.
 *
 * The read, write and delete methods are intended for external callers.
 * The fan out methods are intended to be used within a job coordinator.
 */
public class StartpointManager {
  /**
   * Writes a {@link Startpoint} that defines the start position for a {@link SystemStreamPartition}.
   * @param ssp The {@link SystemStreamPartition} to map the {@link Startpoint} against.
   * @param startpoint Reference to a Startpoint object.
   */
  public void writeStartpoint(SystemStreamPartition ssp, Startpoint startpoint) {...}

  /**
   * Writes a {@link Startpoint} that defines the start position for a {@link SystemStreamPartition} and {@link TaskName}.
   * @param ssp The {@link SystemStreamPartition} to map the {@link Startpoint} against.
   * @param taskName The {@link TaskName} to map the {@link Startpoint} against.
   * @param startpoint Reference to a Startpoint object.
   */
  public void writeStartpoint(SystemStreamPartition ssp, TaskName taskName, Startpoint startpoint) {...}

  /**
   * Returns the last {@link Startpoint} that defines the start position for a {@link SystemStreamPartition}.
   * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
   * @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition}.
   * It is empty if it does not exist or if it is too stale.
   */
  public Optional<Startpoint> readStartpoint(SystemStreamPartition ssp) {...}

  /**
   * Returns the {@link Startpoint} for a {@link SystemStreamPartition} and {@link TaskName}.
   * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
   * @param taskName The {@link TaskName} to fetch the {@link Startpoint} for.
   * @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition} and {@link TaskName}.
   * It is empty if it does not exist or if it is too stale.
   */
  public Optional<Startpoint> readStartpoint(SystemStreamPartition ssp, TaskName taskName) {...}

  /**
   * Deletes the {@link Startpoint} for a {@link SystemStreamPartition}
   * @param ssp The {@link SystemStreamPartition} to delete the {@link Startpoint} for.
   */
  public void deleteStartpoint(SystemStreamPartition ssp) {...}

  /**
   * Deletes the {@link Startpoint} for a {@link SystemStreamPartition} and {@link TaskName}.
   * @param ssp ssp The {@link SystemStreamPartition} to delete the {@link Startpoint} for.
   * @param taskName ssp The {@link TaskName} to delete the {@link Startpoint} for.
   */
  public void deleteStartpoint(SystemStreamPartition ssp, TaskName taskName) {...}

  /**
   * The Startpoints that are written to with {@link #writeStartpoint(SystemStreamPartition, Startpoint)} and with
   * {@link #writeStartpoint(SystemStreamPartition, TaskName, Startpoint)} are moved from a "read-write" namespace
   * to a "fan out" namespace.
   * This method is not atomic or thread-safe. The intent is for the Samza Processor's coordinator to use this
   * method to assign the Startpoints to the appropriate tasks.
   * @param taskToSSPs Determines which {@link TaskName} each {@link SystemStreamPartition} maps to.
   * @return The set of active {@link TaskName}s that were fanned out to.
   */
  public Map<TaskName, Map<SystemStreamPartition, Startpoint>> fanOut(Map<TaskName, Set<SystemStreamPartition>> taskToSSPs) throws IOException {...}

  /**
   * Read the fanned out {@link Startpoint}s for the given {@link TaskName}
   * @param taskName to read the fan out Startpoints for
   * @return fanned out Startpoints
   */
  public Map<SystemStreamPartition, Startpoint> getFanOutForTask(TaskName taskName) throws IOException {...}

  /**
   * Deletes the fanned out {@link Startpoint#apply(Object, StartpointVisitor)} for the given {@link TaskName}
   * @param taskName to delete the fan out Startpoints for
   */
  public void removeFanOutForTask(TaskName taskName) {...}
}
Code Block
languagejava
titleOffsetManager
WIP


Compatibility, Deprecation, and Migration Plan

...

A key part of the core Startpoint feature is for individual task instances to fetch the appropriate Startpoint keyed by SSP-only. The two approaches, fan-out and intent-ACK, have been explored with the analysis detailed in the following subsections. The fan-out strategy is favored over the intent-ACK strategy. See analysis and explanation below.

Fan-out

See General Workflow above for details


Pros

  • Follows the natural progression of the JobCoordinator calculating the job model and then applying the info in the job model to fan out the SSP to SSP-Task Startpoints.
  • Cleaner and simpler book keeping of Startpoints. SSP-only keyed Startpoints are deleted after fan out and SSP+TaskName keyed Startpoints are deleted upon offset commits.

...

This approach is rejected due to the weight of the cons and the complexity required to manage the intents and ACKs once they are applied to the starting offsets and no longer needed.

Loading Startpoints Upon Job Start

Committing Startpoints

Pros

...