You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 29 Next »

Status

Current state: UNDER DISCUSSION and WIP

Discussion thread

JIRA Unable to render Jira issues macro, execution error.

Released: TBD


Purpose

The purpose of this feature is to provide a common interface for external tools and services to rewind or fast-forward starting offsets on any input stream. In addition to providing the common interface, this feature will provide the capabilities to manually manipulate starting offsets by various position types and not only by specific offsets. Many of the current underlying system consumers support different position types for seeking to an offset on an input stream, such as seeks by timestamp, and are not generically exposed by the current framework.

Motivation

In the current Samza framework, manually setting the starting offsets for an input stream requires stopping the Samza processor and using a system-specific checkpoint tool to modify the checkpoint offsets directly in the checkpoint stream. Using the current tooling is tedious and error prone, as it requires proper security access and potentially editing many offsets in a configuration file. In some cases, it can cause a Samza processor to lose its checkpoints. In addition to the dangerous nature of modifying checkpoints directly, the checkpoint offsets are arbitrary strings with system-specific formats. This requires a different checkpoint tool for each system type (i.e. Kafka, Eventhub, Kinesis, etc...).

Terminology

SSP - System stream partition. For example, on a Kafka stream called SomeEvent in the tracking cluster, the system is tracking, the stream is SomeEvent and the partition is a partition ID in the stream.

JC - Job coordinator.

Requirements

Goals

Provide a simple and generic interface to manipulate starting offsets per input stream partition instead of relying on system-specific checkpoint tools and services. This allows flexibility to build REST API layers and tools on top of the common interface.

Allow defining starting offsets on an input stream by SSP across all tasks or SSP per task. n

Framework level support for various offset types such as, specific offsets and timestamp-based offsets.

Maintain backwards compatibility.

Provide safety by setting starting offsets out-of-band and not directly in the checkpoints.

Simplicity. Easy for developers and users to create tools and services to set starting offsets of a given Samza job.

Non-goals

Rewinding or fast-fowarding state store changelogs in relation to the input streams. The challenge is that the changelog stream is typically log compacted.

Providing a service that externally exposes a Startpoint API. Such services require other core changes and will be explored in another SEP or design document.

Proposed Implementation

Different systems in Samza have different formats for checkpoint offsets and lack any contract that describes the offset format. To maintain backwards compatibility and to have better operability for setting starting offsets, this solution introduces the concept of Startpoints and utilizes the abstract metadata storage layer.

A Startpoint indicates what offset position a particular SSP should start consuming from. The Startpoint takes higher precedence than Checkpoints and defines the position type and the position value of the position type. For example, if the Startpoint position type is TIMESTAMP, then the position value is an epoch value in milliseconds. The Startpoint enforces a stricter contract for external tools and services to follow as opposed to the string offset value in the Checkpoint.

A requested Startpoint will be stored in the metadata store. This will be decoupled from the checkpointed offsets in the checkpoint stream.

General Workflow

Loading Startpoints Upon Job Start

Startpoints are written to the metadata store using two key types: SSP-only and SSP+TaskName. For broadcast input streams, an SSP may span across multiple tasks and therefore, Startpoints are applied at the task level. For Startpoints on SSP-only keys, the JC will have a mechanism to fan out the SSP across all tasks that the SSP maps to. The below diagram illustrates the flow.

Committing Startpoints

Once a particular Startpoint is applied to the starting offset of a system-stream-partition in a task instance, it is subsequently removed at the next offset commit. 

Startpoint Models

Startpoint
WIP

StartpointVisitor

SystemConsumer
WIP

Storing Requested Startpoint

Metadata Store

The out-of-band metadata store used is described by the metadata store abstraction feature (SAMZA-1786) from SEP-11. The Startpoints are stored within its own namespace in the metadata store.

Abstractly from the perspective of Startpoint operation, we think of the metadata store as a KV store and keyed by:

SSP→Startpoint or SSP+TaskName→Startpoint

TaskName is optional and is primarily for broadcast inputs where the same SSP spans across multiple tasks. However, when a job starts, all Startpoints keyed only by SSP will be remapped to SSP+TaskName. See General Workflow above for details.

StartpointManager

StartpointManager is the main API to read and write Startpoints and is composed alongside the CheckpointManager in the OffsetManager. The StartpointManager is system implementation agnostic and handles the serialization and deserialization of Startpoints into the metadata store.

StartpointManager
WIP

OffsetManager Augmentation

OffsetManager
WIP


Compatibility, Deprecation, and Migration Plan

All changes are backwards compatible. This is an add on feature and no changes to the existing checkpoint operations are required.

There may be opportunities where offset related APIs that are strongly coupled to Checkpoints may be modified to handle both Startpoints and Checkpoints. Any such APIs will be deprecated until the next major version. 

No migration needed for this new feature.

Test Plan

Create a test job that logs the offset of the first event consumed per task and per SSP. Test using all Startpoint types.

Use the test job to test all combinations of:

  • Job Coordinators (ex: ClusterbasedJobCoordinator, ZkJobCoordinator, etc..)
  • Provided connectors (ex: Kafka, Eventhubs, etc..)
  • Broadcast and non-broadcast streams
  • Various input streams with various partition counts

Analysis

A key part of the core Startpoint feature is for individual task instances to fetch the appropriate Startpoint keyed by SSP-only. The two approaches, fan-out and intent-ACK, have been explored with the analysis detailed in the following subsections. The fan-out strategy is favored over the intent-ACK strategy. See analysis and explanation below.

Fan-out

See General Workflow above for details


Pros

  • Follows the natural progression of the JobCoordinator calculating the job model and then applying the info in the job model to fan out the SSP to SSP-Task Startpoints.
  • Cleaner and simpler book keeping of Startpoints. SSP-only keyed Startpoints are deleted after fan out and SSP+TaskName keyed Startpoints are deleted upon offset commits.

Cons

  • Will not work with the PassthroughJobCoordinator because it does not have a leader election strategy. The fan-out approach requires a JC leader. Workarounds will need to be explored during implementation for the small pool of use cases where PassthroughJobCoordinator is needed.

Intent-ACK

This approach is rejected due to the weight of the cons and the complexity required to manage the intents and ACKs once they are applied to the starting offsets and no longer needed.

Loading Startpoints Upon Job Start

Committing Startpoints

Pros

  • Does not rely on Job Coordinator for consumption of Startpoints.

Cons

  • Following the consumption of startpoints, metadata store will be polluted with SSP-only keyed startpoints and per-task-ACKs. Current implementation of the metadata store will bootstrap all the startpoints and ACKs until they are cleaned up.
  • Task count may change during runtime due to the increase in the input stream partition count. Therefore, new tasks will pick up the Startpoint unless all tasks prior to the increase in task count have written their ACKs and the cleanup process ran prior to the task-to-partition mapping. Startpoints should only apply to the state of the job upon startup and not to any changes during runtime. 

Additional Notes

Following is the workflow for the Intent/ACK approach:

1. The startpoint is written to the startpoint store. 
2. The SamzaContainer's read the startpoint and acknowledge that they've acted on it. 
3. After sufficient number of acks are received, an external agent purge the startpoint. External agent can be either one of the running SamzaContainer of the job or JobCoordinator of the job or a external daemon process.

In above workflow, an external agent will wait for the expected number of ACKs to be published to startpoint-store, and then purge the startpoint. The expected number of ACK's is not a static number. 

The container can acknowledge the startpoint at either one of the following two levels of granularity:

1. Container-level: Currently, compared to YARN with static container count(defined by job.container.count), the number of containers in standalone is dynamic. Processors can join and leave the group at any point in time. For standalone, an external agent cannot determine if it had received sufficient number of ACK's with certainty since number of containers is a moving number. 

 Standalone :

Let's consider the following scenario for standalone application. 

   A. Four processors are part of standalone application when the external agent watches for number of active processors in standalone application.
   B. External agent expects four ACK's to be written to the store before it can purge the startpoint. 
   C. Before any processor acks the startpoint, due to hardware failure, number of active processors gets reduced to three and a re-balance happens. 
   D. After re-balance, three live processors acks their corresponding startpoint to the store and are running fine. External agent cannot clear the start-point, since there're only three ACK's when compared to expected four. 

   Unless we change the coordination protocol in standalone to accommodate start-point, I'm not sure if we can ACK at this level to purge startpoints.

 YARN :

There has been interesting recent developments, where we're planning to develop auto-scaling solution for YARN. In the immediate future, the number of containers in YARN will be not fixed and the above scenario described for standalone will apply for YARN as well. 

2. Task-level: The number of tasks can change dynamically at run-time either due to increase in partition count of any of the input topics of the job or a new topic has matched the user-defined input topic-regex of the job. The above scenario described for standalone is still applicable here.


Rejected Alternatives

Writing Directly to Checkpoint

Previous explored solutions involved modifying the checkpoint offsets directly. Operationally, the Startpoint solution provides more safety because checkpoints are used for fault-tolerance. To prevent human error, the framework should not allow an external source to manipulate the checkpointed offsets. The ability to set starting offsets out-of-band as Startpoints is designed to do, provides the additional safety layer.

Startpoint Intent-ACK Model

See Analysis section for details on why the fan-out model is favored.




  • No labels