Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current stateUnder Discussion

Discussion thread

...

Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-18578

...

Release


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Table of Contents

Motivation

The current Checkpoint of Flink is triggered periodically. Furthermore, the checkpoint is expected to succeed every time it is triggered and the barrier is inserted randomly in the data stream.

However, one example is that the implementation of the database's changed data capture (CDC) is not allowed to take checkpoint in the snapshot phase[1]. The current implementation is holding a checkpoint lock in the snapshot phase, which is deprecated in the current versionSimilar to the problem of CDC, the external data source could have some pieces of data that are transaction data. It is unreasonable to take a checkpoint in the middle of transaction data.


Therefore, what we want to address in this FLIP are:

  • Add an interface for operators so that the operator implementors could utilize it to check the availability of taking snapshots when a checkpoint is triggered.
  • Implement it to the SourceOperator and SourceReader to address the problem of CDCof not taking checkpoints in the middle of an atomic piece of data.

With this implementation, users can determine when to reject the checkpoint.


Public Interfaces

The following proposed interface is added in org.apache.flink.util.

Code Block
languagejava
titleCheckpointAvailabilityProvider
/** The interface for an operator to check its availability of taking snapshots. */
public interface CheckpointAvailabilityProvider {
    /**
     * This method is called to check the state to indicate that if it is able to take a snapshot
     * and what kind of failure should be reported.
     *
     * <p>When the state is not allowed to take a checkpoint or the checkpoint could be skipped. For
     * example,
     *
     * <ul>
     *   <li>the current data might be changed after recovery from the current checkpoint.
     *   <li>the current operator is under high backpressure and taking a snapshot could take a long
     *       time.
     * </ul>
     *
     * <p>Then the operator could simply reject the current checkpoint and wait for the next
     * checkpoint triggered.
     *
     * <p>Two kinds of reject reason could be returned:
     *
     * <ul>
     *   <li>Soft failure, which means that it is acceptable/predictable by the users/developers to
     *       reject this checkpoint.
     *   <li>Hard failure, which means that the checkpoint failure, either checkpoint rejection or
     *       other failure reasons, is unacceptable/unpredictable by the users/developers. For
     *       example, an operator is allowed to reject a checkpoint for a period of time, but every
     *       rejection outside that time range will be treated as a hard failure.
     * </ul>
     *
     * <p>If a soft failure is returned, the coordinator would ignore it. If a hard failure is
     * returned, the coordinator would keep tracking it.
     *
     * @param checkpointID The ID of the current checkpoint.
     * @return indicating that if the Operator is able to take a snapshot and if not, what kind of
     *     failure should be reported. If SNAPSHOT_AVAILABLE is returned, then the subtask would
     *     continue doing the checkpoint. Otherwise, this checkpoint would be rejected and report
     *     the soft/hard failure to the coordinator.
     */
	@PublicEvolving
    SnapshotAvailability isSnapshotAvailable(long checkpointID);
}

...

Code Block
languagejava
titleSnapshotAvailability
/** Various options that can be returned when checking its availability for taking snapshots. */
public enum SnapshotAvailability {
    AVAILABLE,

    SOFT_FAILURE,

    HARD_FAILURE;

    // ------------------------------------------------------------------------

    private final String message;

    SnapshotAvailability() {
        message = null;
    }

    SnapshotAvailability(String message) {
        this.message = message;
    }

    public String getMessage() {
        return message;
    }
}


Proposed Changes

Original Checkpoint Flow

Originally, the checkpoint is triggered with the following process:

...

  • TaskManager (two main things)
    1. Broadcast the barrier downstream.
    2. Ask the operators to take snapshots.

Adding Preliminary Review

Under some scenarios, an operator is not allowed to take a checkpoint when triggered. Therefore, we should add one more step before taking a snapshot, which is examining the state of the operator.

After checking the state of the operator and finding that it is unable to take snapshots, there are two options.

Option 1: Decline Checkpoint

A simple way to deal with it is to decline the checkpoint. So we can simplify the modification as below.

Image Modified

On JobMaster:

  1. take snapshots of OperatorCoordinator
  2. trigger checkpoint on TaskManager
  3. handle decline from TaskManager

...

  • Checkpoints might fail for a long time. Since the operator might reject checkpoint for a long time.
  • Rejection from the operator should be viewed differently from the currently exisiting failure reasons. (If properly handled, it should not be a disadvantage.)

Option 2: Holding barrier until the checkpoint is available

Instead of inserting a barrier immediately when a checkpoint is triggered, the barrier wouldn't be inserted until the operator is ready.

...

  • The event valve has to be shut for a possibly long time to maintain the consistency between JobMaster and TaskManager.
  • The implementation can only be applied on SourceOperator only.

Conclusion

After weighing the pros and cons, the first option looks more promising. Because some possible negative results could be caused by shutting the event valve for a long time, e.g., workload unbalance.

However, we now need to figure out a proper way to handle the soft failure, which will be discussed in the next topic.

Soft Failure Handling Mechanism

Traditionally, a checkpoint is expected to succeed definitely. Therefore, any unexpected reason that caused the failure of the checkpoint could be viewed as a hard failure, such as checkpoint expiration. The hard failure has a sophisticated handling mechanism[2], which is not discussed in this FLIP. However, the introduction of checkpoint rejection also introduces soft failures, a new failure reason.

...

Code Block
languagejava
titleCheckpointConfig
/**
 * The tolerable time range of continuous checkpoint failures. If no checkpoint succeeds within this time
 * range, the job would trigger failover. The default behavior is to allow a infinity time of checkpoint
 * failure.
 */
private long tolerableCheckpointFailureTimeout = UNLIMITED_TOLERABLE_FAILURE_TIMEOUT;

Local Continuous Soft Failure Tolerance

From the local view of an operator itself, it can also take care of its continuous soft failures. Since the JobMaster has only the global view of whether the checkpoint is succeeded, but has no idea whether the rejection from the operator is out of expectation of the operator itself. Therefore, the operator should take the responsibility of returning a proper failure reason.

When the checkpoint rejection is out of the tolerable continuous soft failures, e.g., timeout, then the operator could choose to return a hard failure to the coordinator.

Conclusion

Overall, the operator can return either a soft failure or a hard failure. The JobMaster would count the hard failure but ignore the soft failure. And the job would failover under one of the following two scenarios:

...

A timer would therefore be set to keep tracking of continuous checkpoint failures, including soft failure. The timer would be reset when a checkpoint is completed. But if no timeout is configured, no timer would be set.

Detailed Implementation on current Source API

Follow image shows the current checkpoint mechanism of taking snapshots for operators.

...

Code Block
languagejava
titleSourceReader
@PublicEvolving
public interface SourceReader<T, SplitT extends SourceSplit>
        extends AutoCloseable, CheckpointListener, CheckpointAvailabilityProvider {
	/**
     * This method has a default implementation of returning SNAPSHOT_AVAILABLE, which means that
     * the default behavior is to take a snapshot under any scenario.
     */
    @Override
    default SnapshotAvailability isSnapshotAvailable(long checkpointID) {
        return SnapshotAvailability.AVAILABLE;
    }
}

References

[1] Add rejecting checkpoint logic in source, https://issues.apache.org/jira/browse/FLINK-18578

[2] Checkpoint Failure process improvement, https://docs.google.com/document/d/1ce7RtecuTxcVUJlnU44hzcO2Dwq9g4Oyd8_biy94hJc/edit#