Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: [ UNDER DISCUSSION ]

...

JIRA: SAMZA-2741

Released: 

Problem

Samza users running long running pipelines need a way to gracefully drain them in preparation for an upgrade. The upgrade could be due to changes to transforms or backward incompatible changes in data schema. The only option currently available to the user is to abruptly stop the pipeline and reset intermediate data which could lead to loss of buffered data. User logic is expected to constantly evolve and schema changes are often backward incompatible. It is therefore imperative to provide the ability to seamlessly stop and upgrade their pipelines without losing any buffered data. 

Background

Samza currently offers various job management commands like submit, kill, status, through the  ApplicationRunner and StreamJob API. Kill is currently used to cancel a running pipeline. It immediately triggers shutdown in SamzaContainer which in turn halts processing of the pipeline in the RunLoop. Such abrupt cancellation can lead to unprocessed in-flight data. In-flight data refers to data that is pending processing in the intermediate topics (shuffled data) or outstanding state. Samza provides fault-tolerant processing of streams through its at-least once guarantee. It ensures that the job reprocesses messages from previous checkpoints when it resumes. This guarantees that any inflight-data that was left unprocessed upon termination is re-processed. 

...

These fixes only get the pipeline running again but the inflight intermediate stream data and any samza managed state that was reset is consequently lost. Aside from the data loss, this process of a pipeline upgrade is evidently cumbersome. Therefore, it is important to support the ability to drain and stop a pipeline smoothly prior to making an upgrade which involves an incompatible intermediate schema change. The work here is scoped to drain only intermediate stream (shuffle) data. The onus of clearing user state, if required, is on the user.

Proposed Changes

We propose a drain operation for Samza pipelines which halts the pipeline from consuming new data, completes processing of buffered data and then stops the pipeline. Below is the high level approach with finer details covered in the sections below: 

...

Next, a custom drain control message is inserted as an in-memory marker in a source SSP pending queues to indicate that the pipeline is draining. All data from source SSPs is processed till the drain event is encountered. Samza container shuts down once all tasks have received a drain control message for all the SSPs they are consuming, including any intermediate stream SSPs. Tasks will perform a commit before shut down.


Notifying Samza Pipeline about Drain 

This section delves into the details of notifying samza pipelines to drain. 

...

Code Block
languagejava
titleDrainNotification
collapsetrue
/**
 * DrainNotification is a custom message used by an external controller to trigger Drain.
 * */
public class DrainNotification {
  /**
   * Unique identifier of a drain notification.
   */
  private final UUID uuid;
  /**
   * Unique identifier for a deployment so drain notifications messages can be invalidated across a job restarts.
   */
  private final String runId;

  /**
   * Drain Mode.
   * */
  private final DrainMode drainMode;

  public DrainNotification(UUID uuid, String runId, DrainMode drainMode) {
    Preconditions.checkNotNull(uuid);
    Preconditions.checkNotNull(runId);
    Preconditions.checkNotNull(drainMode);
    this.uuid = uuid;
    this.runId = runId;
    this.drainMode = drainMode;
  }

  /**
   * Creates a DrainNotification in {@link DrainMode#DEFAULT} mode.
   * */
  public static DrainNotification create(UUID uuid, String runId) {
    return new DrainNotification(uuid, runId, DrainMode.DEFAULT);
  }

  public UUID getUuid() {
    return this.uuid;
  }

  public String getRunId() {
    return runId;
  }

  public DrainMode getDrainMode() {
    return drainMode;
  }
}


Handling Drain during event processing

Once the user triggers a drain by writing a DrainNotification, it is available on the coordinator stream for all containers to process.

Polling for Drain Notifications

We will add a class DrainMonitor in SamzaContainer that will periodically check the coordinator-stream backed metadata store for DrainNotification. SystemConsumers periodically polls the consumers for new messages, updates the chooser with the new messages, polls the chooser to pick the next message to process in the RunLoop. The following changes will be made:

...

Code Block
languagejava
titleDrainMessage
collapsetrue
public class DrainMessage extends ControlMessage {
  /**
   * Id used to invalidate DrainMessages between runs. Ties to app.run.id from config.
   */
  private final String runId;

  public DrainMessage(String runId) {
    this(null, runId);
  }

  public DrainMessage(@JsonProperty("task-name") String taskName, @JsonProperty("run-id") String runId) {
    super(taskName);
    this.runId = runId;
  }

  public String getRunId() {
    return runId;
  }
}


Processing Drain control messages

The run loop orchestrates reading and processing messages, checkpointing and windowing among tasks. RunLoop will perform drain in the following steps:

...

A task is ready to commit, when task.commit is requested either by user or commit thread and when process, window, commit and scheduler are not in progress. When task.async.commit is true and task.max.concurrency > 1, the task can commit when message processing is in progress. When in drain mode, commits shouldn’t happen asynchronously with message processing.

Drain in Samza High Level API

Samza high-level API provides a rich set of operators which can be chained to form a DAG of operations on the MessageStream. StreamOperatorTask is a StreamTask implementation which takes the operator graph and processes input messages by traversing the DAG. 

...

Code Block
languagejava
titleOperatorImpl
collapsetrue
/**
* Abstract base class for all stream operator implementations.
*
* @param <M> type of the input to this operator
* @param <RM> type of the results of applying this operator
*/
public abstract class OperatorImpl<M, RM> {
 /**
   * This method is implemented when all input streams to this operation have encountered drain control message.
   * Inherited class should handle drain by overriding this function.
   * By default noop implementation is for in-memory operator to handle the drain. Output operator need to
   * override this to actually propagate drain over the wire.
   * @param collector message collector
   * @param coordinator task coordinator
   * @return results to be emitted when this operator reaches drain
   */
   protected  Collection<RM> handleDrain(MessageCollector collector, TaskCoordinator coordinator) {
     return Collections.emptyList();
   }
}

Watermark & State Handling with Drain

Samza supports two notions of time. By default, all built-in Samza operators use processing time. Samza supports event-time operations through the Beam API. Whenever the user signals a drain, the watermark is advanced to infinity through watermark control messages. Beam’s samza runner keeps a local rocksdb state for timers and window state. Timers will be fired and window data is processed as a consequence of the watermark being advanced to infinity.

Advancing the watermark doesn't have any effect in Samza since windowing is processing-time based. Samza high-level API supports window operations on MessageStream. It keeps a track of window data in local rocksdb state and tracks the triggers in-memory. When the window operator receives drain, all the triggers will fire and results will be emitted from the window operation. This is implemented by overriding the handleDrain in WindowOperatorImpl.

Implementation and Test Plan

  1. Implement DrainNotification, DrainMode, DrainUtils
  2. Implement DrainMonitor and RunLoop processing changes
  3. High-Level API processing changes

...

  1. Testing DrainNotification, DrainUtils, serde of DrainNotification
  2. Testing DrainMonitor
  3. Testing RunLoop change high-level API changes
    1. Drain for task with single SSP
    2. Drain for task with multiple SSPs
    3. Drain waits for inflight messages
    4. Drain honors drain specific async task timeout
    5. Drain commit behavior
  4. Integration test for High-Level API
    1. Drain with shuffle stages
    2. Drain without shuffle stages
    3. Drain with shuffle stages and with drain notification published before container start
    4. Drain without shuffle stages and with drain notification published before container start
  5. Integration test for Low Level Level API
    1. Test drain
    2. Test drain with drain notification published before container start

Rejected Alternatives

Approach 1: Drain on by default every stop

One approach is to perform drain by default on every pipeline stop which would obviate the need to write DrainNotification using the DrainManager. Rest of the operations would be the same. When samza engine drains inflight data, it will also have to deal with all the buffered window state and timer data. Given the nature of streaming pipelines, there will always be incomplete windows at any point in time. This means that always draining the pipelines will lead to emitting incorrect windows and firing all timers on every stop. Hence, draining should not be performed on every stop but instead be reserved for exceptional cases where the user has to make a backward incompatible change to intermediate schemas. 

Rollout

The plan is to release this feature with Samza 1.8  release

...