You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state: [ UNDER DISCUSSION ]

Discussion thread: <link to mailing list DISCUSS thread>

JIRA: SAMZA-2741

Released: 

Problem

Samza users running long running pipelines need a way to gracefully drain them in preparation for an upgrade. The upgrade could be due to changes to transforms or backward incompatible changes in data schema. The only option currently available to the user is to abruptly stop the pipeline and reset intermediate data which could lead to loss of buffered data. User logic is expected to constantly evolve and schema changes are often backward incompatible. It is therefore imperative to provide the ability to seamlessly stop and upgrade their pipelines without losing any buffered data. This document proposes a feature called drain which allows a user to perform a graceful stop-and-drain operation on a Samza job.

Motivation

Samza currently offers various job management commands like submit, kill, status, through the  ApplicationRunner and StreamJob API. Kill is currently used to cancel a running pipeline. It immediately triggers shutdown in SamzaContainer which in turn halts processing of the pipeline in the RunLoop. Such abrupt cancellation can lead to unprocessed in-flight data. In-flight data refers to data that is pending processing in the intermediate topics (shuffled data) or outstanding state. Samza provides fault-tolerant processing of streams through its at-least once guarantee. It ensures that the job reprocesses messages from previous checkpoints when it resumes. This guarantees that any inflight-data that was left unprocessed upon termination  is re-processed. 

Abruptly stopping a pipeline can, however, be a problem if there are backward incompatible changes in the intermediate data schema after restart. Any inflight data in intermediate streams will cause failure upon reprocessing as it cannot be deserialized due to the incompatible schema. The usual fix to get past the serde issue for shuffled data is to switch to a fresh intermediate stream. This is achieved by changing the intermediate transform names in Beam API as the intermediate topic names are derived from them. Beam persists window and timer data in local rocksdb state which could run into the serde issues as well. If the Samza pipeline is using timers and windows, any outstanding beam managed state would need to be reset along with its backing changelog stream. Users can manage their own state through Beam State API and Samza’s LocalTable which are backed by rocksdb stores. The stateid for Beam State and tableId for LocalTable can be changed to switch to a fresh rocksdb state when the user wants to switch to a fresh Beam state using a new schema or perform read/write operations on a  new LocalTable.

These fixes only get the pipeline running again but the inflight intermediate stream data and any Beam managed state that was reset is consequently lost. These are not one-off issues but are encountered on a  regular basis by our users. Aside from the data loss, this process of a pipeline upgrade is evidently cumbersome. Therefore, it is important to support the ability to drain and stop a pipeline smoothly prior to making an upgrade which involves an incompatible intermediate schema change. The work here is scoped to drain only intermediate stream (shuffle) data and beam managed window and timer state. The onus of clearing user state (LocalTable and Beam State API), if required, is on the user and can be done by switching to a fresh state.

Proposed Changes


Public Interfaces


Implementation and Test Plan


Compatibility, Deprecation, and Migration Plan


Rejected Alternatives


  • No labels