Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: UNDER DISCUSSION and WIP

Discussion threadhttp://mail-archives.apache.org/mod_mbox/samza-dev/201812.mbox/%3CCABpE9c1_N4hFBsj7Yr5v_7q89SVwnm6Zr-MGuLz1e%3De8MchLkg%40mail.gmail.com%3E

JIRA

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keySAMZA-1983

...

  1. Rewinding or fast-fowarding state store changelogs in relation to the input streams. The challenge is that the changelog stream is typically log compacted.
  2. Providing a service that externally exposes a Startpoint API. Such services require other core changes and will be explored in another SEP or design document.

Proposed

...

Changes

Different systems in Samza have different formats for checkpoint offsets and lack any contract that describes the offset format. To maintain backwards compatibility and to have better operability for setting starting offsets, this solution introduces the concept of Startpoints and utilizes the abstract metadata storage layer.

...

As with Checkpoints, Startpoints are applied to the starting offset of an SSP in a task instance during the start up time of the SamzaContainer.

Image Modified

Committing Startpoints

Once a particular Startpoint is applied to the starting offset of a system-stream-partition SSP in a task instance, it is subsequently removed at the next offset commit. 

Image Modified

Failure Scenarios

SamzaContainer is restarted before the first offset commit

  • The Startpoint will be applied to the starting offset again upon restart. However, this is identical to the same failure scenario for Checkpoints.

JobCoordinator is restarted before all Startpoints are applied

  • Similar to the above failure scenario, except across multiple containers since restarting a JobCoordinator restarts all associated SamzaContainers.

Startpoint Models

OffsetManager Augmentation

WIP

Storing Requested Startpoint

Metadata Store

Referenced in the General Workflow above.

The out-of-band metadata store used is described by the metadata store abstraction feature (SAMZA-1786) from SEP-11. The Startpoints are stored within its own namespaces in the metadata store.

Abstractly from the perspective of Startpoint operation, we think of the metadata store as a KV store and mapped as such:

{SSP→Startpoint} or {SSP+TaskName→Startpoint}

TaskName is optional and is primarily for broadcast inputs where the same SSP spans across multiple tasks.

Upon the fan out, the Startpoints are mapped by task with the same structure as checkpoints:

{TaskName→{SSP→Startpoint}}  

Failure Scenarios

SamzaContainer is restarted before the first offset commit

  • The Startpoint will be applied to the starting offset again upon restart. However, this is identical to the same failure scenario for Checkpoints.

JobCoordinator is restarted before all Startpoints are applied

  • Similar to the above failure scenario, except across multiple containers since restarting a JobCoordinator restarts all associated SamzaContainers.

Public Interfaces

Startpoint Models

Code Block
languagejava
titleStartpoint
/**
 * Startpoint represents a position in a stream partition.
 */
public abstract class Startpoint {

  
Code Block
languagejava
titleStartpoint
/**
 * Startpoint represents a position in a stream partition.
 */
public abstract class Startpoint {

  private final long creationTimestamp;

  /**
   * Applies the {@link StartpointVisitor}'s visit methods to the {@link Startpoint}
   * and returns the result of that operation.
   * @param input the metadata associated with the startpoint.
   * @param startpointVisitor the visitor of the startpoint.
   * @return the result of applying the visitor on startpoint.
   */
  public abstract <IN, OUT> OUT apply(IN input, StartpointVisitor<IN, OUT> startpointVisitor);
}


/**
 * A {@link Startpoint} that represents the earliest offset in a stream partition.
 */
public final class StartpointOldest extends Startpoint {

  @Override
  public <IN, OUT> OUT apply(IN input, StartpointVisitor<IN, OUT> startpointVisitor) {
    return startpointVisitor.visit(input, this);
  }
}

/**
 * A {@link Startpoint} that represents the latest offset in a stream partition.
 */
public final class StartpointUpcoming extends Startpoint {

  @Override
  public <IN, OUT> OUT apply(IN input, StartpointVisitor<IN, OUT> startpointVisitor) {
    return startpointVisitor.visit(input, this);
  }
}

/**
 * A {@link Startpoint} that represents a timestamp offset in a stream partition.
 */
public final class StartpointTimestamp extends Startpoint {

  private final Long timestampOffset;
}

/**
 * A {@link Startpoint} that represents a specific offset in a stream partition.
 */
public final class StartpointSpecific extends Startpoint {

  private final String specificOffset;

  /**
   * Getter for the specific offset.
   * @return the specific offset.
   */
  public String getSpecificOffset() {
    return specificOffset;
  }

  @Override
  public <IN, OUT> OUT apply(IN input, StartpointVisitor<IN, OUT> startpointVisitor) {
    return startpointVisitor.visit(input, this);
  }
}

...

Additional Interface Method for SystemAdmin

Referred to in Step 7 of the Loading Startpoints Upon Job Startup section above.

Code Block
languagejava
titleSystemConsumerSystemAdmin
public interface SystemAdmin {
...
  /**
   * AResolves {@linkthe SystemAdmin}startpoint implementationto shoulda implementsystem thisspecific abstractionoffset.
 to support * @param startpoint represents the startpoint.
   * @param systemStreamPartition represents the system stream partition.
   * @return the resolved offset.
   */
  String resolveStartpointToOffset(SystemStreamPartition systemStreamPartition, Startpoint startpoint);
...
}

StartpointVisitor

Code Block
languagejava
titleSystemConsumer
/**
 * A {@link SystemAdmin} implementation should implement this abstraction to support {@link Startpoint}.
 */
public interface StartpointVisitor<IN, OUT> {

  /**
   * Performs a sequence of operations using the {@link IN} and {@link StartpointSpecific} and returns the result of the execution.
   * @param input the input metadata about the startpoint.
   * @param startpointSpecific the {@link Startpoint} thatand represents the specific offset.instance
 * should *visit @returnvia the result of executing{@link Startpoint#apply(Object, StartpointVisitor)} within the operations
 defined* by the visit method.
   {@link SystemAdmin#resolveStartpointToOffset(SystemStreamPartition, Startpoint)} implementation.
 */
public  default OUT visit(IN inputinterface StartpointVisitor<IN, StartpointSpecificOUT> startpointSpecific) {
    throw new UnsupportedOperationException("StartpointSpecific is not supported.");
  }

  /**
   * Performs a sequence of operations using the {@link IN} and {@link StartpointTimestampStartpointSpecific} and returns the result of the execution.
   * @param input the input metadata about the startpoint.
   * @param startpointTimestampstartpointSpecific the {@link Startpoint} that represents the specific timestampoffset.
   * @return the result of executing the operations defined by the visit method.
   */
  default OUT visit(IN input, StartpointTimestampStartpointSpecific startpointTimestampstartpointSpecific) {
    throw new UnsupportedOperationException("StartpointTimestampStartpointSpecific is not supported.");
  }

  /**
   * Performs a sequence of operations using the {@link IN} and {@link StartpointOldestStartpointTimestamp} and returns the result of the execution.
   * @param input the input metadata about the startpoint.
   * @param startpointOldeststartpointTimestamp the {@link Startpoint} that represents the earliest offsettimestamp.
   * @return the result of executing the operations defined by the visit method.
   */
  default OUT visit(IN input, StartpointOldestStartpointTimestamp startpointOldeststartpointTimestamp) {
    throw new UnsupportedOperationException("StartpointOldestStartpointTimestamp is not supported.");
  }

  /**
   * Performs a sequence of operations using the {@link IN} and {@link StartpointUpcomingStartpointOldest} and returns the result of the execution.
   * @param input the input metadata about the startpoint.
   * @param startpointUpcomingstartpointOldest the {@link Startpoint} that represents the latestearliest offset.
   * @return the result of executing the operations defined by the visit method.
   */
  default OUT visit(IN input, StartpointUpcomingStartpointOldest startpointUpcomingstartpointOldest) {
    throw new UnsupportedOperationException("StartpointUpcomingStartpointOldest is not supported.");
  }
}

Storing Requested Startpoint

Metadata Store

The out-of-band metadata store used is described by the metadata store abstraction feature (SAMZA-1786) from SEP-11. The Startpoints are stored within its own namespaces in the metadata store.

Abstractly from the perspective of Startpoint operation, we think of the metadata store as a KV store and mapped as such:

{SSP→Startpoint} or {SSP+TaskName→Startpoint}

TaskName is optional and is primarily for broadcast inputs where the same SSP spans across multiple tasks.

Upon the fan out, the Startpoints are mapped by task with the same structure as checkpoints:

{TaskName→{SSP→Startpoint}}  

See General Workflow above for details.

StartpointManager



  /**
   * Performs a sequence of operations using the {@link IN} and {@link StartpointUpcoming} and returns the result of the execution.
   * @param input the input metadata about the startpoint.
   * @param startpointUpcoming the {@link Startpoint} that represents the latest offset.
   * @return the result of executing the operations defined by the visit method.
   */
  default OUT visit(IN input, StartpointUpcoming startpointUpcoming) {
    throw new UnsupportedOperationException("StartpointUpcoming is not supported.");
  }
}

StartpointManager

StartpointManager is StartpointManager is the main API to read and write Startpoints and is composed alongside the CheckpointManager in the OffsetManager. It also provides the methods to fan out the StartpointsThe StartpointManager is system-implementation agnostic and handles the serialization and deserialization of Startpoints into the metadata store.

Code Block
languagejava
titleStartpointManager
WIP
OffsetManager

...

public class StartpointManager {
/**
* Writes a {@link Startpoint} that defines the start position for a {@link SystemStreamPartition}.
* @param ssp The {@link SystemStreamPartition} to map the {@link Startpoint} against.
* @param startpoint Reference to a Startpoint object.
*/
public void writeStartpoint(SystemStreamPartition ssp, Startpoint startpoint) {...}

/**
* Writes a {@link Startpoint} that defines the start position for a {@link SystemStreamPartition} and {@link TaskName}.
* @param ssp The {@link SystemStreamPartition} to map the {@link Startpoint} against.
* @param taskName The {@link TaskName} to map the {@link Startpoint} against.
* @param startpoint Reference to a Startpoint object.
*/
public void writeStartpoint(SystemStreamPartition ssp, TaskName taskName, Startpoint startpoint) {...}

/**
* Returns the last {@link Startpoint} that defines the start position for a {@link SystemStreamPartition}.
* @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
* @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition}.
* It is empty if it does not exist or if it is too stale.
*/
public Optional<Startpoint> readStartpoint(SystemStreamPartition ssp) {...}

/**
* Returns the {@link Startpoint} for a {@link SystemStreamPartition} and {@link TaskName}.
* @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
* @param taskName The {@link TaskName} to fetch the {@link Startpoint} for.
* @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition} and {@link TaskName}.
* It is empty if it does not exist or if it is too stale.
*/
public Optional<Startpoint> readStartpoint(SystemStreamPartition ssp, TaskName taskName) {...}/**
* Deletes the {@link Startpoint} for a {@link SystemStreamPartition}
* @param ssp The {@link SystemStreamPartition} to delete the {@link Startpoint} for.
*/
public void deleteStartpoint(SystemStreamPartition ssp) {...}
/**
* Deletes the {@link Startpoint} for a {@link SystemStreamPartition} and {@link TaskName}.
* @param ssp ssp The {@link SystemStreamPartition} to delete the {@link Startpoint} for.
* @param taskName ssp The {@link TaskName} to delete the {@link Startpoint} for.
*/
public void deleteStartpoint(SystemStreamPartition ssp, TaskName taskName) {...}

/**
* The Startpoints that are written to with {@link #writeStartpoint(SystemStreamPartition, Startpoint)} and with
* {@link #writeStartpoint(SystemStreamPartition, TaskName, Startpoint)} are moved from a "read-write" namespace
* to a "fan out" namespace.
* This method is not atomic or thread-safe. The intent is for the Samza Processor's coordinator to use this
* method to assign the Startpoints to the appropriate tasks.
* @param taskToSSPs Determines which {@link TaskName} each {@link SystemStreamPartition} maps to.
* @return The set of active {@link TaskName}s that were fanned out to.
*/
public Map<TaskName, Map<SystemStreamPartition, Startpoint>> fanOut(Map<TaskName, Set<SystemStreamPartition>> taskToSSPs) throws IOException {...}

/**
* Read the fanned out {@link Startpoint}s for the given {@link TaskName}
* @param taskName to read the fan out Startpoints for
* @return fanned out Startpoints
*/
public Map<SystemStreamPartition, Startpoint> getFanOutForTask(TaskName taskName) throws IOException {...}

/**
* Deletes the fanned out {@link Startpoint#apply(Object, StartpointVisitor)} for the given {@link TaskName}
* @param taskName to delete the fan out Startpoints for
*/
public void removeFanOutForTask(TaskName taskName) {...}
Code Block
languagejava
titleOffsetManager
WIP


Compatibility, Deprecation, and Migration Plan

...