Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: [ UNDER DISCUSSION ACCEPTED ]

Discussion thread: <link to mailing list DISCUSS thread>

JIRA: SAMZA-2741

Released: 

...

Problem

Samza users running long running pipelines need a way to gracefully drain them in preparation for an upgrade. The upgrade could be due to changes to transforms or backward incompatible changes in data schema. The only option currently available to the user is to abruptly stop the pipeline and reset intermediate data which could lead to loss of buffered data. User logic is expected to constantly evolve and schema changes are often backward incompatible. It is therefore imperative to provide the ability to seamlessly stop and upgrade their pipelines without losing any buffered data. 

Background

Samza currently offers various job management commands like submit, kill, status, through the  ApplicationRunner and StreamJob API. Kill is currently used to cancel a running pipeline. It immediately triggers shutdown in SamzaContainer which in turn halts processing of the pipeline in the RunLoop. Such abrupt cancellation can lead to unprocessed in-flight data. In-flight data refers to data that is pending processing in the intermediate topics (shuffled data) or outstanding state. Samza provides fault-tolerant processing of streams through its at-least once guarantee. It ensures that the job reprocesses messages from previous checkpoints when it resumes. This guarantees that any inflight-data that was left unprocessed upon termination is re-processed. 

Abruptly stopping a pipeline can, however, be a problem if there are backward incompatible changes in the intermediate data schema after restart. Any inflight data in intermediate streams will cause failure upon reprocessing as it cannot be deserialized due to the incompatible schema. The usual fix to get past the serde issue for shuffled data is to switch to a fresh intermediate stream. This is achieved by changing the opId for the PartitionBy operator in samza high-level API and the intermediate transform names in Beam API as the intermediate topic names are derived from them. Beam Samza persists window and timer data in local rocksdb Rocksdb state which could run into the serde issues as well. If the Samza pipeline is using timers and windows, any outstanding beam samza managed state would need to be reset along with its backing changelog stream. Users can manage their own state through Beam State API and Samza’s LocalTable which are backed by rocksdb stores. The stateid for Beam State and tableId for LocalTable can be changed to switch to a fresh rocksdb state when the user wants to switch to a fresh Beam state using a new schema or perform read/write operations on a  new LocalTable.

These fixes only get the pipeline running again but the inflight intermediate stream data and any samza managed state that was reset is consequently lost. Aside from the data loss, this process of a pipeline upgrade is evidently cumbersome. Therefore, it is important to support the ability to drain and stop a pipeline smoothly prior to making an upgrade which involves an incompatible intermediate schema change. 

The work here is scoped to drain only intermediate stream (shuffle) data and system-managed state. The onus of clearing user state like LocalTable, if required, is on the user. System managed state comprises of window aggregation state and timers. The proposed approach will prematurely emit window panes and fire all timers , thereby, draining all state. Continuation of state post drain is beyond the scope of this workThese fixes only get the pipeline running again but the inflight intermediate stream data and any Beam managed state that was reset is consequently lost. Aside from the data loss, this process of a pipeline upgrade is evidently cumbersome. Therefore, it is important to support the ability to drain and stop a pipeline smoothly prior to making an upgrade which involves an incompatible intermediate schema change. The work here is scoped to drain only intermediate stream (shuffle) data and beam managed window and timer state. The onus of clearing user state (LocalTable and Beam State API), if required, is on the user and can be done by switching to a fresh state.

Proposed Changes

We propose a drain operation for Samza pipelines which halts the pipeline from consuming new data, completes processing of buffered data and then stops the pipeline.

Below is the high level approach with finer details covered in the sections below: 

...

We introduce DrainNotification which should be written to the coordinator stream store to notify the pipeline to trigger drain. This work will also provide utilities classes to aid with writing DrainNotification to the metadata store.

...

Samza container will periodically check for drain notifications. Once it does encounter one, we first advance the watermark to infinity at the sources for all source streams which is effectively equivalent to converting an unbounded source to a bounded source. This guarantees that any buffered window data is processed and timers are fired. Watermark will be propagated to the downstream stages of the pipeline thereby ensuring that all local state is processed

Next, a custom drain control message is inserted as an in-memory marker in an SSP’s pending queue a source SSP pending queues to indicate that the pipeline is draining. All data from source SSPs is processed till the drain event is encountered. Samza container shuts down once all tasks have received a drain control message for all the SSPs they are consuming, including any intermediate stream SSPs. Tasks will perform a commit before shut down.

Image Added


Notifying Samza Pipeline about Drain 

This section delves into the details of notifying samza pipelines to drain. 

We introduce classes- DrainNotifcation and DrainUtils. DrainNotifcation is used to signal to the samza engine to start drain. DrainUtilsprovides the ability to signal drain for a Samza job. It uses a namespace-aware coordinator-stream backed metadata store to read and write a drain notifcation on the coordinator stream. Kafka coordinator stream backed metadata-store acts as the control channel in Samza, which makes the drain notification available to all containers for the job. DeploymentId will be different for every deployment of the job and will be used by the containers to validate if the drain action was intended for the current deployment. The config, app.run.id, is used to uniquely identify the current deployment in a container. DrainNotification will be cleaned up from the coordinator stream by the job coordinator on successful shut down.


Code Block
languagejava
title
Code Block
languagejava
titleDrainNotification
collapsetrue
/**
 * DrainNotification is a custom message used by an external controller to trigger Drain.
 * */
public class DrainNotification {
  /**
   * Unique identifier of a drain notification.
   */
  private final UUID uuid;
  /**
   * Unique identifier for a deployment so drain notifications messages can be invalidated across a job restarts.
   */
  private final String runId;

  /**
   * Drain Mode.
   * */
  private final DrainMode drainMode;

  public DrainNotification(UUID uuid, String runId, DrainMode drainMode) {
    Preconditions.checkNotNull(uuid);
    Preconditions.checkNotNull(runId);
    Preconditions.checkNotNull(drainMode);
    this.uuid = uuid;
    this.runId = runId;
    this.drainMode = drainMode;
  }

  /**
   * Creates a DrainNotification in {@link DrainMode#DEFAULT} mode.
   * */
  public static DrainNotification create(UUID uuid, String runId) {
    return new DrainNotification(uuid, runId, DrainMode.DEFAULT);
  }

  public UUID getUuid() {
    return this.uuid;
  }

  public String getRunId() {
    return runId;
  }

  public DrainMode getDrainMode() {
    return drainMode;
  }
}


Handling Drain during event processing

Once the user triggers a drain by writing a DrainNotification, it is available on the coordinator stream for all containers to process.

Polling for Drain Notifications

We will add a class DrainMonitor in SamzaContainer that will periodically check the coordinator-stream backed metadata store for DrainNotification. SystemConsumers periodically polls the consumers for new messages, updates the chooser with the new messages, polls the chooser to pick the next message to process in the RunLoop. The following changes will be made:

  • If a DrainNotification message is encountered by Drain, it sets the SamzaContainer in drain

...

  • mode
  • SystemConsumers will stop polling for new messages from source systems on every refresh. The refresh will still pick messages from intermediate stream systems. This marks the end of consumption of any new messages from sources. We still want to consume messages that are propagated downstream from source to intermediate steams.

...

  • SystemConsumers maintains an in-memory queue of unprocessed messages per SSP.
    • Upon drain, no new messages will be inserted in the source SSP queues as refresh of source consumers has stopped. There will be pending messages in the queues from previous refreshes.
    • It
  • of unprocessed messages per SSP. On drain, it
    • will write Watermark control messages to all active source SSP queues (registered SSPs - intermediate SSPs - end-of-stream SSPs) to advance the watermark to infinity (Long.MaxValue).
  • It
    • Next, it will also append a drain control message to all active source SSP queues (registered SSPs - intermediate SSPs - end-of-stream SSPs) to denote that the SSPs are draining
  • .

Drain control messages (DrainMesssage) are special markers appended to the per-SSP in-memory queue of unprocessed messages in SystemConsumers. Its purpose is to indicate that the SSP is draining and the chooser will not return any more messages for that SSP. It is akin to other control messages, namely WatermarkMessage and  EndOfStreamMessage. The current deployment id is also added to the Drain control message to differentiate the drain markers between re-deploys. Drain message will only be processed by the run loop if its deployment id matches the current deployment id to prevent accidental prop.

...

Code Block
languagejava
titleDrainMessage
collapsetrue
public class DrainMessage extends ControlMessage {
  /**
   * Id used to invalidate DrainMessages between runs. Ties to app.run.id from config.
   */
  private final String runId;

  public DrainMessage(String runId) {
    this(null, runId);
  }

  public DrainMessage(@JsonProperty("task-name") String taskName, @JsonProperty("run-id") String runId) {
    super(taskName);
    this.runId = runId;
  }

  public String getRunId() {
    return runId;
  }
}


Processing

...

Drain control messages

The run loop orchestrates reading and processing messages, checkpointing and windowing among tasks. RunLoop will perform drain in the following steps:

...

A task is ready to commit, when task.commit is requested either by user or commit thread and when process, window, commit and scheduler are not in progress. When task.async.commit is true and task.max.concurrency > 1, the task can commit when message processing is in progress. When in drain mode, commits shouldn’t happen asynchronously with message processing.

Drain in Samza High Level API

Samza high-level API provides a rich set of operators which can be chained to form a DAG of operations on the MessageStream. StreamOperatorTask is a StreamTask implementation which takes the operator graph and processes input messages by traversing the DAG. 

The task starts processing at the input operators of the operator DAG. We introduce a class `DrainStates` DrainStates which handles all the book-keeping for drain messages. When the operator encounters a drain message, it updates the internal bookkeeping tracking all the SSPs for a system stream which received a drain message. If all input streams for the operator receive a drain control message, it propagates the message to the downstream operators in the operator DAG. 

In case of a shuffle stage, `DrainMessage` DrainMessage is always sent to a single partition of the downstream stage (intermediate stream) by all source partitions. This is done for aggregation and `DrainStates` DrainStates keep a track of itwhich source partitions have sent a DrainMessage to the aggregate partition. Once the downstream aggregate partition has received drain messages from all the parent partitions, drain message is broadcasted to all the peer partitions. This stage repeats the same process is repeated for the all downstream stages.

handleDrain will be added to the base OperatorImpl implementation which executes when all input streams to the operator have been received. In the case of a broadcast or partition operator, handleDrain will write Drain control messages to the intermediate stream for propagation.

...

Code Block
languagejava
titleOperatorImpl
collapsetrue
/**
* Abstract base class for all stream operator implementations.
*
* @param <M> type of the input to this operator
* @param <RM> type of the results of applying this operator
*/
public abstract class OperatorImpl<M, RM> {
 /**
   * This method is implemented when all input streams to this operation have encountered drain control message.
   * Inherited class should handle drain by overriding this function.
   * By default noop implementation is for in-memory operator to handle the drain. Output operator need to
   * override this to actually propagate drain over the wire.
   * @param collector message collector
   * @param coordinator task coordinator
   * @return results to be emitted when this operator reaches drain
   */
   protected  Collection<RM> handleDrain(MessageCollector collector, TaskCoordinator coordinator) {
     return Collections.emptyList();
   }
}

Watermark & State Handling with Drain

Samza supports two notions of time. By default, all built-in Samza operators use processing time. Samza supports event-time operations through the Beam API. Whenever the user signals a drain, the watermark is advanced to infinity through watermark control messages. Beam’s samza runner keeps a local rocksdb state for timers and window state. Timers will be fired and window data is processed as a consequence of the watermark being advanced to infinity.

Advancing the watermark doesn't have any effect in Samza since windowing is processing-time based. Samza high-level API supports window operations on MessageStream. It keeps a track of window data in local rocksdb state and tracks the triggers in-memory. When the window operator receives drain, all the triggers will fire and results will be emitted from the window operation. This is implemented by overriding the handleDrain in WindowOperatorImplSamza supports two notions of time. By default, all built-in Samza operators use processing time. Samza supports event-time operations through the Beam API. Whenever the user signals a drain, the watermark is advanced to infinity through watermark control messages. Beam’s samza runner keeps a local rocksdb state for timers and window state. Timers will be fired and window data is processed as a consequence of the watermark being advanced to infinity.

...

Advancing the watermark doesn't have any effect in Samza since windowing is processing-time based. Samza high-level API supports window operations on MessageStream. It keeps a track of window data in local rocksdb state and tracks the triggers in-memory. When the window operator receives drain, all the triggers will fire and results will be emitted from the window operation. This is implemented by overriding the

...

handleDrain in WindowOperatorImpl.

Implementation and Test Plan

  1. Implement DrainNotification, DrainMode, DrainUtils
  2. Implement DrainMonitor and RunLoop processing changes
  3. High-Level API processing changes

...

  1. Testing DrainNotification, DrainUtils, serde of DrainNotification
  2. Testing DrainMonitor
  3. Testing RunLoop change high-level API changes
    1. Drain for task with single SSP
    2. Drain for task with multiple SSPs
    3. Drain waits for inflight messages
    4. Drain honors drain specific async task timeout
    5. Drain commit behavior
  4. Integration test for High-Level API
    1. Drain with shuffle stages
    2. Drain without shuffle stages
    3. Drain with shuffle stages and with drain notification published before container start
    4. Drain without shuffle stages and with drain notification published before container start
  5. Integration test for Low Level Level API
    1. Test drain
    2. Test drain with drain notification published before container start

Compatibility, Deprecation, and Migration Plan

Rejected Alternatives

Approach 1: Drain on by default every stop

One approach is to perform drain by default on every pipeline stop which would obviate the need to write DrainNotification using the DrainManager. Rest of the operations would be the same. When samza engine drains inflight data, it will also have to deal with all the buffered window state and timer data. Given the nature of streaming pipelines, there will always be incomplete windows at any point in time. This means that always draining the pipelines will lead to emitting incorrect windows and firing all timers on every stop. Hence, draining should not be performed on every stop but instead be reserved for exceptional cases where the user has to make a backward incompatible change to intermediate schemas. 

Rollout

The plan is to release this feature with Samza 1.8  release