Status
Current state: UNDER DISCUSSION and WIPACCEPTED
Vote: http://mail-archives.apache.org/mod_mbox/samza-dev/201909.mbox/browser
Discussion thread: http://mail-archives.apache.org/mod_mbox/samza-dev/201812.mbox/%3CCABpE9c1_N4hFBsj7Yr5v_7q89SVwnm6Zr-MGuLz1e%3De8MchLkg%40mail.gmail.com%3E
JIRA:
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Authors: Daniel Nimishura, Shanthoosh Venkataraman
Released: TBDSamza-1.4
Table of Contents exclude Status
...
Once a particular Startpoint is applied to the starting offset of a SSP in a task instance, it is subsequently removed at the next offset commit.
OffsetManager Augmentation
WIP
Storing Requested Startpoint
Metadata Store
Referenced in the General Workflow above.
The out-of-band metadata store used is described by the metadata store abstraction feature (SAMZA-1786) from SEP-11. The Startpoints are stored within its own namespaces in the metadata store.
...
Referred to in Step 7 of the Loading Startpoints Upon Job Startup section above.
Code Block | ||||
---|---|---|---|---|
| ||||
public interface SystemAdmin { ... /** * Resolves the startpoint to a system specific offset. * @param startpoint represents the startpoint. * @param systemStreamPartition represents the system stream partition. * @return the resolved offset. */ String resolveStartpointToOffset(SystemStreamPartition systemStreamPartition, Startpoint startpoint); ... } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
public class StartpointManager { /** * Writes a The StartpointManager reads and writes {@link Startpoint} that defines the to the provided {@link MetadataStore} * * The intention for the StartpointManager is to maintain a strong contract between the caller * and how Startpoints are stored in the underlying MetadataStore. * * Startpoints are written in the MetadataStore using keys of two different formats: * 1) {@link SystemStreamPartition} only * 2) A combination of {@link SystemStreamPartition} and {@link TaskName} * * Startpoints are then fanned out to a fan out namespace in the MetadataStore by the * {@link org.apache.samza.clustermanager.ClusterBasedJobCoordinator} or the standalone * {@link org.apache.samza.coordinator.JobCoordinator} upon startup and the * {@link org.apache.samza.checkpoint.OffsetManager} gets the fan outs to set the starting offsets per task and per * {@link SystemStreamPartition}. The fan outs are deleted once the offsets are committed to the checkpoint. * * The read, write and delete methods are intended for external callers. * The fan out methods are intended to be used within a job coordinator. */ public class StartpointManager { /** * Writes a {@link Startpoint} that defines the start position for a {@link SystemStreamPartition}. * @param ssp The {@link SystemStreamPartition} to map the {@link Startpoint} against. * @param startpoint Reference to a Startpoint object. */ public void writeStartpoint(SystemStreamPartition ssp, Startpoint startpoint) {...} /** * Writes a {@link Startpoint} that defines the start position for a {@link SystemStreamPartition} and {@link TaskName}. * @param ssp The {@link SystemStreamPartition} to map the {@link Startpoint} against. * @param taskName The {@link TaskName} to map the {@link Startpoint} against. * @param startpoint Reference to a Startpoint object. */ public void writeStartpoint(SystemStreamPartition ssp, TaskName taskName, Startpoint startpoint) {...} /** * Returns the last {@link Startpoint} that defines the start position for a {@link SystemStreamPartition}. * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for. * @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition}. * It is empty if it does not exist or if it is too stale. */ public Optional<Startpoint> readStartpoint(SystemStreamPartition ssp) {...} /** * Returns the {@link Startpoint} for a {@link SystemStreamPartition} and {@link TaskName}. * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for. * @param taskName The {@link TaskName} to fetch the {@link Startpoint} for. * @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition} and {@link TaskName}. * It is empty if it does not exist or if it is too stale. */ public Optional<Startpoint> readStartpoint(SystemStreamPartition ssp, TaskName taskName) {...} /** * Deletes the {@link Startpoint} for a {@link SystemStreamPartition} * @param ssp The {@link SystemStreamPartition} to delete the {@link Startpoint} for. */ public void deleteStartpoint(SystemStreamPartition ssp) {...} /** * Deletes the {@link Startpoint} for a {@link SystemStreamPartition} and {@link TaskName}. * @param ssp ssp The {@link SystemStreamPartition} to delete the {@link Startpoint} for. * @param taskName ssp The {@link TaskName} to delete the {@link Startpoint} for. */ public void deleteStartpoint(SystemStreamPartition ssp, TaskName taskName) {...} /** * The Startpoints that are written to with {@link #writeStartpoint(SystemStreamPartition, Startpoint)} and with * {@link #writeStartpoint(SystemStreamPartition, TaskName, Startpoint)} are moved from a "read-write" namespace * to a "fan out" namespace. * This method is not atomic or thread-safe. The intent is for the Samza Processor's coordinator to use this * method to assign the Startpoints to the appropriate tasks. * @param taskToSSPs Determines which {@link TaskName} each {@link SystemStreamPartition} maps to. * @return The set of active {@link TaskName}s that were fanned out to. */ public Map<TaskName, Map<SystemStreamPartition, Startpoint>> fanOut(Map<TaskName, Set<SystemStreamPartition>> taskToSSPs) throws IOException {...} /** * Read the fanned out {@link Startpoint}s for the given {@link TaskName} * @param taskName to read the fan out Startpoints for * @return fanned out Startpoints */ public Map<SystemStreamPartition, Startpoint> getFanOutForTask(TaskName taskName) throws IOException {...} /** * Deletes the fanned out {@link Startpoint#apply(Object, StartpointVisitor)} for the given {@link TaskName} * @param taskName to delete the fan out Startpoints for */ public void removeFanOutForTask(TaskName taskName) {...} } |
Compatibility, Deprecation, and Migration Plan
...
A key part of the core Startpoint feature is for individual task instances to fetch the appropriate Startpoint keyed by SSP-only. The two approaches, fan-out and intent-ACK, have been explored with the analysis detailed in the following subsections. The fan-out strategy is favored over the intent-ACK strategy. See analysis and explanation below.
Fan-out
See General Workflow above for details
Pros
- Follows the natural progression of the JobCoordinator calculating the job model and then applying the info in the job model to fan out the SSP to SSP-Task Startpoints.
- Cleaner and simpler book keeping of Startpoints. SSP-only keyed Startpoints are deleted after fan out and SSP+TaskName keyed Startpoints are deleted upon offset commits.
...