You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Status

Current state: UNDER DISCUSSION

Discussion thread

JIRA Unable to render Jira issues macro, execution error.

Released: 

Purpose

The purpose of this feature is to provide a common interface for external tools and services to rewind or fast-forward starting offsets on any input stream. In addition to providing the common interface, this feature will provide the capabilities to manually manipulate starting offsets by various position types and not only by specific offsets. Many of the current underlying system consumers support different position types for seeking to an offset on an input stream, such as seeks by timestamp, and are not generically exposed by the current framework.

Motivation

In the current Samza framework, manually setting the starting offsets for an input stream requires stopping the Samza processor and using a checkpoint tool to modify the checkpoint offsets directly in the checkpoint stream. Using the current tooling is tedious and error prone, as it requires proper security access and potentially editing many offsets in a config file. In some cases, it can cause a Samza processor to lose its checkpoints. In addition to the dangerous nature of modifying checkpoints directly, the checkpoint offsets are arbitrary strings with system-specific formats. This requires a different checkpoint tool for each system type (i.e. Kafka, Eventhub, Kinesis, etc...).

Requirements

Goals

Provide a simple and generic interface to manipulate starting offsets per input stream partition instead of relying on system-specific checkpoint tools and services. This allows flexibility to build REST API layers and tools on top of the common interface.

Allow manipulating starting offsets on an input stream not only by specific offsets, but with different position types, such as by timestamp.

Maintain backwards compatibility.

Provide safety by setting starting offsets out-of-band and not directly in the checkpoints.

Simplicity. Easy for developers and users to use.

Non-goals

Full snapshot of a Samza job's state at a particular point in time.

Rewinding or fast-fowarding state store changelogs in relation to the input streams. The challenge is that the changelog stream is typically log compacted.

Scope


Proposed Changes

Different systems in Samza have different formats for checkpoint offsets and lacks any contract that describes the offset format. To maintain backwards compatibility and to have better operability for setting starting offsets, this solution takes the approach of defining the concept of Startpoints and utilizing a storage layer separate from manipulating the checkpoint offsets directly in the checkpoint stream.

The Startpoint indicates what offset position a particular SystemStreamPartition should start consuming from. The Startpoint takes higher precedence than Checkpoints and defines the type and the position value of the position type. For example, if the Startpoint position type is TIMESTAMP, then the position value is an epoch value. The Startpoint enforces a stricter contract for external tools and services to follow as opposed to the string offset value in the Checkpoint.

Each SystemConsumer will also implement a new register method to handle a Startpoint.

A requested Startpoint will be stored in a metadata store. This will be decoupled from the actual checkpoints offsets in the checkpoint stream.

Public Interfaces


Implementation and Test Plan


Compatibility, Deprecation, and Migration Plan


Rejected Alternatives


  • No labels