Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keySAMZA-1983

Released: 

Problem

Motivation

...

Purpose

The purpose of this feature is to provide a common interface for external tools and services to rewind or fast-forward starting offsets on any input stream. In addition to providing the common interface, this feature will provide the capabilities to manually manipulate starting offsets by various position types and not only by specific offsets. Many of the current underlying system consumers support different position types for seeking to an offset on an input stream, such as seeks by timestamp, and are not generically exposed by the current framework.

Motivation

In the current Samza framework, manually setting the starting offsets for an input stream requires stopping the Samza processor and using a checkpoint tool to modify the checkpoint offsets directly in the checkpoint stream. Using the current tooling is tedious and error prone, as it requires proper security access and potentially editing many offsets in a config file. In some cases, it can cause a Samza processor to lose its checkpoints. In addition to the dangerous nature of modifying checkpoints directly, the checkpoint offsets are arbitrary strings with system-specific formats. This requires a different checkpoint tool for each system type (i.e. Kafka, Eventhub, Kinesis, etc...).

Requirements

Goals

Provide a simple and generic interface to manipulate starting offsets per input stream partition instead of relying on system-specific checkpoint tools and services. This allows flexibility to build REST API layers and tools on top of the common interface.

Allow manipulating starting offsets on an input stream not only by specific offsets, but with different position types, such as by timestamp.

Maintain backwards compatibility.

Provide safety by setting starting offsets out-of-band and not directly in the checkpoints.

Simplicity. Easy for developers and users to use.

Non-goals

Full snapshot of a Samza job's state at a particular point in time.

Rewinding or fast-fowarding state store changelogs in relation to the input streams. The challenge is that the changelog stream is typically log compacted.

Scope


Proposed Changes

Different systems in Samza have different formats for checkpoint offsets and lacks any contract that describes the offset format. To maintain backwards compatibility and to have better operability for setting starting offsets, this solution takes the approach of defining the concept of Startpoints and utilizing a storage layer separate from manipulating the checkpoint offsets directly in the checkpoint stream.

The Startpoint indicates what offset position a particular SystemStreamPartition should start consuming from. The Startpoint takes higher precedence than Checkpoints and defines the type and the position value of the position type. For example, if the Startpoint position type is TIMESTAMP, then the position value is an epoch value. The Startpoint enforces a stricter contract for external tools and services to follow as opposed to the string offset value in the Checkpoint.

Each SystemConsumer will also implement a new register method to handle a Startpoint.

A requested Startpoint will be stored in a metadata store. This will be decoupled from the actual checkpoints offsets in the checkpoint stream.

Public Interfaces


Implementation and Test Plan

...