Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

A requested Startpoint will be stored in the metadata store. This will be decoupled from the checkpointed offsets in the checkpoint stream.

General Workflow

Loading Startpoints Upon Job

...

Startup

Startpoints are written to the metadata store under a readWrite namespace using two key types: SSP-only and SSP+TaskName. For broadcast input streams, an SSP may span across multiple tasks and therefore, Startpoints are applied at the task level. For Startpoints on SSP-only keys, the JC will have a mechanism to fan out the SSP across all tasks that the SSP maps totask to SSP mappings are retrieved from the JobModel. Upon the subsequent start of the Samza job, the Startpoints are fanned out to a fanOut namespace in the metadata store. The below diagram illustrates the flow.

As with Checkpoints, Startpoints are applied to the starting offset of an SSP in a task instance during the start up time of the SamzaContainer.

Image RemovedImage Added

Committing Startpoints

Once a particular Startpoint is applied to the starting offset of a system-stream-partition in a task instance, it is subsequently removed at the next offset commit. 


Image RemovedImage Added

Failure Scenarios

...

Code Block
languagejava
titleStartpoint
/**
 * Startpoint represents a position in a stream partition.
 */
public abstract class Startpoint {

  private final long creationTimestamp;

  /**
   * Applies the {@link StartpointVisitor}'s visit methods to the {@link Startpoint}
   * and returns the result of that operation.
   * @param input the metadata associated with the startpoint.
   * @param startpointVisitor the visitor of the startpoint.
   * @return the result of applying the visitor on startpoint.
   */
  public abstract <IN, OUT> OUT apply(IN input, StartpointVisitor<IN, OUT> startpointVisitor);
}


/**
 * A {@link Startpoint} that represents the earliest offset in a stream partition.
 */
public final class StartpointOldest extends Startpoint {

  @Override
  public <IN, OUT> OUT apply(IN input, StartpointVisitor<IN, OUT> startpointVisitor) {
    return startpointVisitor.visit(input, this);
  }
}

/**
 * A {@link Startpoint} that represents the latest offset in a stream partition.
 */
public final class StartpointUpcoming extends Startpoint {

  @Override
  public <IN, OUT> OUT apply(IN input, StartpointVisitor<IN, OUT> startpointVisitor) {
    return startpointVisitor.visit(input, this);
  }
}

/**
 * A {@link Startpoint} that represents a timestamp offset in a stream partition.
 */
public final class StartpointTimestamp extends Startpoint {

  private final Long timestampOffset;
}

/**
 * A {@link Startpoint} that represents a specific offset in a stream partition.
 */
public final class StartpointSpecific extends Startpoint {

  private final String specificOffset;


  /**
   * Getter for the specific offset.
   * @return the specific offset.
   */
  public String getSpecificOffset() {
    return specificOffset;
  }

  @Override
  public <IN, OUT> OUT apply(IN input, StartpointVisitor<IN, OUT> startpointVisitor) {
    return startpointVisitor.visit(input, this);
  }
}

...

The out-of-band metadata store used is described by the metadata store abstraction feature (SAMZA-1786) from SEP-11. The Startpoints are stored within its own namespace namespaces in the metadata store.

Abstractly from the perspective of Startpoint operation, we think of the metadata store as a KV store and keyed bymapped as such:

{SSP→Startpoint} or{SSP+TaskName→Startpoint}

TaskName is optional and is primarily for broadcast inputs where the same SSP spans across multiple tasks. However, when a job starts, all Startpoints keyed only by SSP will be fanned out to SSP+TaskName.

Upon the fan out, the Startpoints are mapped by task with the same structure as checkpoints:

{TaskName→{SSP→Startpoint}}  

See General Workflow above for details.

...

This approach is rejected due to the weight of the cons and the complexity required to manage the intents and ACKs once they are applied to the starting offsets and no longer needed.

Loading Startpoints Upon Job Start

Committing Startpoints

Pros

...