Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Page properties


Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-170-Adding-Checkpoint-Rejection-Mechanism-td51212.html
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-18578

Release


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Table of Contents

Motivation

The current Checkpoint of Flink is triggered periodically. Furthermore, the checkpoint is expected to succeed every time it is triggered and the barrier is inserted randomly in the data stream.

...

With this implementation, users can determine when to reject the checkpoint.


Public Interfaces

The following proposed interface is added in org.apache.flink.util.

...

Code Block
languagejava
titleSnapshotAvailability
/** Various options that can be returned when checking its availability for taking snapshots. */
public enum SnapshotAvailability {
    AVAILABLE,

    SOFT_FAILURE,

    HARD_FAILURE;

    // ------------------------------------------------------------------------

    private final String message;

    SnapshotAvailability() {
        message = null;
    }

    SnapshotAvailability(String message) {
        this.message = message;
    }

    public String getMessage() {
        return message;
    }
}


Proposed Changes

Original Checkpoint Flow

Originally, the checkpoint is triggered with the following process:

...

  • TaskManager (two main things)
    1. Broadcast the barrier downstream.
    2. Ask the operators to take snapshots.

Adding Preliminary Review

Under some scenarios, an operator is not allowed to take a checkpoint when triggered. Therefore, we should add one more step before taking a snapshot, which is examining the state of the operator.

After checking the state of the operator and finding that it is unable to take snapshots, there are two options.

Option 1: Decline Checkpoint

A simple way to deal with it is to decline the checkpoint. So we can simplify the modification as below.

Image Modified

On JobMaster:

  1. take snapshots of OperatorCoordinator
  2. trigger checkpoint on TaskManager
  3. handle decline from TaskManager

...

  • Checkpoints might fail for a long time. Since the operator might reject checkpoint for a long time.
  • Rejection from the operator should be viewed differently from the currently exisiting failure reasons. (If properly handled, it should not be a disadvantage.)

Option 2: Holding barrier until the checkpoint is available

Instead of inserting a barrier immediately when a checkpoint is triggered, the barrier wouldn't be inserted until the operator is ready.

...

  • The event valve has to be shut for a possibly long time to maintain the consistency between JobMaster and TaskManager.
  • The implementation can only be applied on SourceOperator only.

Conclusion

After weighing the pros and cons, the first option looks more promising. Because some possible negative results could be caused by shutting the event valve for a long time, e.g., workload unbalance.

However, we now need to figure out a proper way to handle the soft failure, which will be discussed in the next topic.

Soft Failure Handling Mechanism

Traditionally, a checkpoint is expected to succeed definitely. Therefore, any unexpected reason that caused the failure of the checkpoint could be viewed as a hard failure, such as checkpoint expiration. The hard failure has a sophisticated handling mechanism[2], which is not discussed in this FLIP. However, the introduction of checkpoint rejection also introduces soft failures, a new failure reason.

...

Code Block
languagejava
titleCheckpointConfig
/**
 * The tolerable time range of continuous checkpoint failures. If no checkpoint succeeds within this time
 * range, the job would trigger failover. The default behavior is to allow a infinity time of checkpoint
 * failure.
 */
private long tolerableCheckpointFailureTimeout = UNLIMITED_TOLERABLE_FAILURE_TIMEOUT;

Local Continuous Soft Failure Tolerance

From the local view of an operator itself, it can also take care of its continuous soft failures. Since the JobMaster has only the global view of whether the checkpoint is succeeded, but has no idea whether the rejection from the operator is out of expectation of the operator itself. Therefore, the operator should take the responsibility of returning a proper failure reason.

When the checkpoint rejection is out of the tolerable continuous soft failures, e.g., timeout, then the operator could choose to return a hard failure to the coordinator.

Conclusion

Overall, the operator can return either a soft failure or a hard failure. The JobMaster would count the hard failure but ignore the soft failure. And the job would failover under one of the following two scenarios:

...

A timer would therefore be set to keep tracking of continuous checkpoint failures, including soft failure. The timer would be reset when a checkpoint is completed. But if no timeout is configured, no timer would be set.

Detailed Implementation on current Source API

Follow image shows the current checkpoint mechanism of taking snapshots for operators.

...

Code Block
languagejava
titleSourceReader
@PublicEvolving
public interface SourceReader<T, SplitT extends SourceSplit>
        extends AutoCloseable, CheckpointListener, CheckpointAvailabilityProvider {
	/**
     * This method has a default implementation of returning AVAILABLE, which means that
     * the default behavior is to take a snapshot under any scenario.
     */
    @Override
    default SnapshotAvailability isSnapshotAvailable(long checkpointID) {
        return SnapshotAvailability.AVAILABLE;
    }
}

References

[1] Add rejecting checkpoint logic in source, https://issues.apache.org/jira/browse/FLINK-18578

...