Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA: SAMZA-2741

Released: 

...

Problem

Samza users running long running pipelines need a way to gracefully drain them in preparation for an upgrade. The upgrade could be due to changes to transforms or backward incompatible changes in data schema. The only option currently available to the user is to abruptly stop the pipeline and reset intermediate data which could lead to loss of buffered data. User logic is expected to constantly evolve and schema changes are often backward incompatible. It is therefore imperative to provide the ability to seamlessly stop and upgrade their pipelines without losing any buffered data. 

...

Abruptly stopping a pipeline can, however, be a problem if there are backward incompatible changes in the intermediate data schema after restart. Any inflight data in intermediate streams will cause failure upon reprocessing as it cannot be deserialized due to the incompatible schema. The usual fix to get past the serde issue for shuffled data is to switch to a fresh intermediate stream. This is achieved by changing the opId for the PartitionBy operator in samza high-level API and the intermediate transform names in Beam API as the intermediate topic names are derived from them. Beam Samza persists window and timer data in local rocksdb Rocksdb state which could run into the serde issues as well. If the Samza pipeline is using timers and windows, any outstanding beam managed state would need to be reset along with its backing changelog stream. Users can manage their own state through Beam State API and Samza’s LocalTable which are backed by rocksdb stores. The stateid for Beam State and tableId for LocalTable can be changed to switch to a fresh rocksdb state when the user wants to switch to a fresh Beam state using a new schema or perform read/write operations on a  new LocalTable.

These fixes only get the pipeline running again but the inflight intermediate stream data and any Beam managed state that was reset is consequently lost. These fixes only get the pipeline running again but the inflight intermediate stream data and any Beam managed state that was reset is consequently lost. Aside from the data loss, this process of a pipeline upgrade is evidently cumbersome. Therefore, it is important to support the ability to drain and stop a pipeline smoothly prior to making an upgrade which involves an incompatible intermediate schema change. The work here is scoped to drain only intermediate stream (shuffle) data and beam managed window and timer state. The onus . The onus of clearing user state (LocalTable and Beam State API), if required, is on the user and can be done by switching to a fresh state.

...

We introduce DrainNotification which should be written to the coordinator stream store to notify the pipeline to trigger drain. This work will also provide utilities classes to aid with writing DrainNotification to the metadata store.

...

Next, a custom drain control message is inserted as an in-memory marker in an SSP’s pending queue to indicate that the pipeline is draining. All data from source SSPs is processed till the drain event is encountered. Samza container shuts down once all tasks have received a drain control message for all the SSPs they are consuming, including any intermediate stream SSPs. Tasks will perform a commit before the SSPs they are consuming, including any intermediate stream SSPs. Tasks will perform a commit before shut down.

Image Added


Notifying Samza Pipeline about Drain 

This section delves into the details of notifying samza pipelines to drain. 

We introduce classes- DrainNotifcation and DrainUtils. DrainNotifcation is used to signal to the samza engine to start drain. DrainUtilsprovides the ability to signal drain for a Samza job. It uses a namespace-aware coordinator-stream backed metadata store to read and write a drain notifcation on the coordinator stream. Kafka coordinator stream backed metadata-store acts as the control channel in Samza, which makes the drain notification available to all containers for the job. DeploymentId will be different for every deployment of the job and will be used by the containers to validate if the drain action was intended for the current deployment. The config, app.run.id, is used to uniquely identify the current deployment in a container. DrainNotification will be cleaned up from the coordinator stream by the job coordinator on successful shut down.


Code Block
languagejava
titleDrainNotification
collapsetrue
/**
 * DrainNotification is a custom message used by an external controller to trigger Drain.
 * */
public class DrainNotification {
  /**
   * Unique identifier of a drain notification.
   */
  private final UUID uuid;
  /**
   * Unique identifier for a deployment so drain notifications messages can be invalidated across a job restarts.
   */
  private final String runId;

  /**
   * Drain Mode.
   * */
  private final DrainMode drainMode;

  public DrainNotification(UUID uuid, String runId, DrainMode drainMode) {
    Preconditions.checkNotNull(uuid);
    Preconditions.checkNotNull(runId);
    Preconditions.checkNotNull(drainMode);
    this.uuid = uuid;
    this.runId = runId;
    this.drainMode = drainMode;
  }

  /**
   * Creates a DrainNotification in {@link DrainMode#DEFAULT} mode.
   * */
  public static DrainNotification create(UUID uuid, String runId) {
    return new DrainNotification(uuid, runId, DrainMode.DEFAULT);
  }

  public UUID getUuid() {
    return this.uuid;
  }

  public String getRunId() {
    return runId;
  }

  public DrainMode getDrainMode() {
    return drainMode;
  }
}

...

Code Block
languagejava
titleDrainMessage
collapsetrue
public class DrainMessage extends ControlMessage {
  /**
   * Id used to invalidate DrainMessages between runs. Ties to app.run.id from config.
   */
  private final String runId;

  public DrainMessage(String runId) {
    this(null, runId);
  }

  public DrainMessage(@JsonProperty("task-name") String taskName, @JsonProperty("run-id") String runId) {
    super(taskName);
    this.runId = runId;
  }

  public String getRunId() {
    return runId;
  }
}


Processing

...

Drain control messages

The run loop orchestrates reading and processing messages, checkpointing and windowing among tasks. RunLoop will perform drain in the following steps:

...

The task starts processing at the input operators of the operator DAG. We introduce a class `DrainStates` DrainStates which handles all the book-keeping for drain messages. When the operator encounters a drain message, it updates the internal bookkeeping tracking all the SSPs for a system stream which received a drain message. If all input streams for the operator receive a drain control message, it propagates the message to the downstream operators in the operator DAG. 

In case of a shuffle stage, `DrainMessage` DrainMessage is always sent to a single partition of the downstream stage (intermediate stream) by all source partitions. This is done for aggregation and `DrainStates` DrainStates keep a track of itwhich source partitions have sent a DrainMessage to the aggregate partition. Once the downstream aggregate partition has received drain messages from all the parent partitions, drain message is broadcasted to all the peer partitions. This stage repeats the same process is repeated for the all downstream stages.

handleDrain will be added to the base OperatorImpl implementation which executes when all input streams to the operator have been received. In the case of a broadcast or partition operator, handleDrain will write Drain control messages to the intermediate stream for propagation.

...

  1. Testing DrainNotification, DrainUtils, serde of DrainNotification
  2. Testing DrainMonitor
  3. Testing RunLoop change high-level API changes
    1. Drain for task with single SSP
    2. Drain for task with multiple SSPs
    3. Drain waits for inflight messages
    4. Drain honors drain specific async task timeout
    5. Drain commit behavior
  4. Integration test for High-Level API
    1. Drain with shuffle stages
    2. Drain without shuffle stages
    3. Drain with shuffle stages and with drain notification published before container start
    4. Drain without shuffle stages and with drain notification published before container start
  5. Integration test for Low Level Level API
    1. Test drain
    2. Test drain with drain notification published before container start

...

Rejected Alternatives

Approach 1: Drain on by default every stop

...