Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: UNDER DISCUSSION and WIPACCEPTED

Votehttp://mail-archives.apache.org/mod_mbox/samza-dev/201909.mbox/browser

Discussion threadhttp://mail-archives.apache.org/mod_mbox/samza-dev/201812.mbox/%3CCABpE9c1_N4hFBsj7Yr5v_7q89SVwnm6Zr-MGuLz1e%3De8MchLkg%40mail.gmail.com%3E

JIRA

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keySAMZA-1983

Authors: Daniel Nimishura, Shanthoosh Venkataraman

Released: TBDSamza-1.4

Table of Contents
excludeStatus

...

Once a particular Startpoint is applied to the starting offset of a SSP in a task instance, it is subsequently removed at the next offset commit. 

OffsetManager Augmentation

WIP

Storing Requested Startpoint

Metadata Store

Referenced in the General Workflow above.

The out-of-band metadata store used is described by the metadata store abstraction feature (SAMZA-1786) from SEP-11. The Startpoints are stored within its own namespaces in the metadata store.

...

Referred to in Step 7 of the Loading Startpoints Upon Job Startup section above.

Code Block
languagejava
titleSystemAdmin
public interface SystemAdmin {
...
  /**
   * Resolves the startpoint to a system specific offset.
   * @param startpoint represents the startpoint.
   * @param systemStreamPartition represents the system stream partition.
   * @return the resolved offset.
   */
  String resolveStartpointToOffset(SystemStreamPartition systemStreamPartition, Startpoint startpoint);
...
}

...

Code Block
languagejava
titleOffsetManager
public class StartpointManager {
/**
 * Writes a The StartpointManager reads and writes {@link Startpoint} that defines the to the provided {@link MetadataStore}
 *
 * The intention for the StartpointManager is to maintain a strong contract between the caller
 * and how Startpoints are stored in the underlying MetadataStore.
 *
 * Startpoints are written in the MetadataStore using keys of two different formats:
 * 1) {@link SystemStreamPartition} only
 * 2) A combination of {@link SystemStreamPartition} and {@link TaskName}
 *
 * Startpoints are then fanned out to a fan out namespace in the MetadataStore by the
 * {@link org.apache.samza.clustermanager.ClusterBasedJobCoordinator} or the standalone
 * {@link org.apache.samza.coordinator.JobCoordinator} upon startup and the
 * {@link org.apache.samza.checkpoint.OffsetManager} gets the fan outs to set the starting offsets per task and per
 * {@link SystemStreamPartition}. The fan outs are deleted once the offsets are committed to the checkpoint.
 *
 * The read, write and delete methods are intended for external callers.
 * The fan out methods are intended to be used within a job coordinator.
 */
public class StartpointManager {
  /**
   * Writes a {@link Startpoint} that defines the start position for a {@link SystemStreamPartition}.
   * @param ssp The {@link SystemStreamPartition} to map the {@link Startpoint} against.
   * @param startpoint Reference to a Startpoint object.
   */
  public void writeStartpoint(SystemStreamPartition ssp, Startpoint startpoint) {...}

  /**
   * Writes a {@link Startpoint} that defines the start position for a {@link SystemStreamPartition} and {@link TaskName}.
   * @param ssp The {@link SystemStreamPartition} to map the {@link Startpoint} against.
   * @param taskName The {@link TaskName} to map the {@link Startpoint} against.
   * @param startpoint Reference to a Startpoint object.
   */
  public void writeStartpoint(SystemStreamPartition ssp, TaskName taskName, Startpoint startpoint) {...}

  /**
   * Returns the last {@link Startpoint} that defines the start position for a {@link SystemStreamPartition}.
   * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
   * @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition}.
   * It is empty if it does not exist or if it is too stale.
   */
  public Optional<Startpoint> readStartpoint(SystemStreamPartition ssp) {...}

  /**
   * Returns the {@link Startpoint} for a {@link SystemStreamPartition} and {@link TaskName}.
   * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
   * @param taskName The {@link TaskName} to fetch the {@link Startpoint} for.
   * @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition} and {@link TaskName}.
   * It is empty if it does not exist or if it is too stale.
   */
  public Optional<Startpoint> readStartpoint(SystemStreamPartition ssp, TaskName taskName) {...}

  /**
   * Deletes the {@link Startpoint} for a {@link SystemStreamPartition}
   * @param ssp The {@link SystemStreamPartition} to delete the {@link Startpoint} for.
   */

  public void deleteStartpoint(SystemStreamPartition ssp) {...}

  /**
   * Deletes the {@link Startpoint} for a {@link SystemStreamPartition} and {@link TaskName}.
   * @param ssp ssp The {@link SystemStreamPartition} to delete the {@link Startpoint} for.
   * @param taskName ssp The {@link TaskName} to delete the {@link Startpoint} for.
   */
  public void deleteStartpoint(SystemStreamPartition ssp, TaskName taskName) {...}

  /**
   * The Startpoints that are written to with {@link #writeStartpoint(SystemStreamPartition, Startpoint)} and with
   * {@link #writeStartpoint(SystemStreamPartition, TaskName, Startpoint)} are moved from a "read-write" namespace
   * to a "fan out" namespace.
   * This method is not atomic or thread-safe. The intent is for the Samza Processor's coordinator to use this
   * method to assign the Startpoints to the appropriate tasks.
   * @param taskToSSPs Determines which {@link TaskName} each {@link SystemStreamPartition} maps to.
   * @return The set of active {@link TaskName}s that were fanned out to.
   */
  public Map<TaskName, Map<SystemStreamPartition, Startpoint>> fanOut(Map<TaskName, Set<SystemStreamPartition>> taskToSSPs) throws IOException {...}

  /**
   * Read the fanned out {@link Startpoint}s for the given {@link TaskName}
   * @param taskName to read the fan out Startpoints for
   * @return fanned out Startpoints
   */
  public Map<SystemStreamPartition, Startpoint> getFanOutForTask(TaskName taskName) throws IOException {...}

  /**
   * Deletes the fanned out {@link Startpoint#apply(Object, StartpointVisitor)} for the given {@link TaskName}
   * @param taskName to delete the fan out Startpoints for
   */
  public void removeFanOutForTask(TaskName taskName) {...}
}


Compatibility, Deprecation, and Migration Plan

...

A key part of the core Startpoint feature is for individual task instances to fetch the appropriate Startpoint keyed by SSP-only. The two approaches, fan-out and intent-ACK, have been explored with the analysis detailed in the following subsections. The fan-out strategy is favored over the intent-ACK strategy. See analysis and explanation below.

Fan-out

See General Workflow above for details


Pros

  • Follows the natural progression of the JobCoordinator calculating the job model and then applying the info in the job model to fan out the SSP to SSP-Task Startpoints.
  • Cleaner and simpler book keeping of Startpoints. SSP-only keyed Startpoints are deleted after fan out and SSP+TaskName keyed Startpoints are deleted upon offset commits.

...