Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Next, a custom drain control message is inserted as an in-memory marker in an SSP’s pending queue to indicate that the pipeline is draining. All data from source SSPs is processed till the drain event is encountered. Samza container shuts down once all tasks have received a drain control message for all the SSPs they are consuming, including any intermediate stream SSPs. Tasks will perform a commit before shut down.


Code Block
languagejava
titleDrainNotification
collapsetrue

/**
 * DrainNotification is a custom message used by an external controller to trigger Drain.
 * */
public class DrainNotification {
  /**
   * Unique identifier of a drain notification.
   */
  private final UUID uuid;
  /**
   * Unique identifier for a deployment so drain notifications messages can be invalidated across a job restarts.
   */
  private final String runId;

  /**
   * Drain Mode.
   * */
  private final DrainMode drainMode;

  public DrainNotification(UUID uuid, String runId, DrainMode drainMode) {
    Preconditions.checkNotNull(uuid);
    Preconditions.checkNotNull(runId);
    Preconditions.checkNotNull(drainMode);
    this.uuid = uuid;
    this.runId = runId;
    this.drainMode = drainMode;
  }

  /**
   * Creates a DrainNotification in {@link DrainMode#DEFAULT} mode.
   * */
  public static DrainNotification create(UUID uuid, String runId) {
    return new DrainNotification(uuid, runId, DrainMode.DEFAULT);
  }

  public UUID getUuid() {
    return this.uuid;
  }

  public String getRunId() {
    return runId;
  }

  public DrainMode getDrainMode() {
    return drainMode;
  }
}


Handling Drain during event processing

...