You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 9 Current »

Status

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-170-Adding-Checkpoint-Rejection-Mechanism-td51212.html
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Motivation

The current Checkpoint of Flink is triggered periodically. Furthermore, the checkpoint is expected to succeed every time it is triggered and the barrier is inserted randomly in the data stream.

However, one example is that the implementation of the database's changed data capture (CDC) is not allowed to take checkpoint in the snapshot phase[1]. Similar to the problem of CDC, the external data source could have some pieces of data that are transaction data. It is unreasonable to take a checkpoint in the middle of transaction data.


Therefore, what we want to address in this FLIP are:

  • Add an interface for operators so that the operator implementors could utilize it to check the availability of taking snapshots when a checkpoint is triggered.
  • Implement it to the SourceOperator and SourceReader to address the problem of not taking checkpoints in the middle of an atomic piece of data.

With this implementation, users can determine when to reject the checkpoint.


Public Interfaces

The following proposed interface is added in org.apache.flink.util.

CheckpointAvailabilityProvider
/** The interface for an operator to check its availability of taking snapshots. */
public interface CheckpointAvailabilityProvider {
    /**
     * This method is called to check the state to indicate that if it is able to take a snapshot
     * and what kind of failure should be reported.
     *
     * <p>When the state is not allowed to take a checkpoint or the checkpoint could be skipped. For
     * example,
     *
     * <ul>
     *   <li>the current data might be changed after recovery from the current checkpoint.
     *   <li>the current operator is under high backpressure and taking a snapshot could take a long
     *       time.
     * </ul>
     *
     * <p>Then the operator could simply reject the current checkpoint and wait for the next
     * checkpoint triggered.
     *
     * <p>Two kinds of reject reason could be returned:
     *
     * <ul>
     *   <li>Soft failure, which means that it is acceptable/predictable by the users/developers to
     *       reject this checkpoint.
     *   <li>Hard failure, which means that the checkpoint failure, either checkpoint rejection or
     *       other failure reasons, is unacceptable/unpredictable by the users/developers. For
     *       example, an operator is allowed to reject a checkpoint for a period of time, but every
     *       rejection outside that time range will be treated as a hard failure.
     * </ul>
     *
     * <p>If a soft failure is returned, the coordinator would ignore it. If a hard failure is
     * returned, the coordinator would keep tracking it.
     *
     * @param checkpointID The ID of the current checkpoint.
     * @return indicating that if the Operator is able to take a snapshot and if not, what kind of
     *     failure should be reported. If AVAILABLE is returned, then the subtask would
     *     continue doing the checkpoint. Otherwise, this checkpoint would be rejected and report
     *     the soft/hard failure to the coordinator.
     */
	@PublicEvolving
    SnapshotAvailability isSnapshotAvailable(long checkpointID);
}


SnapshotAvailability
/** Various options that can be returned when checking its availability for taking snapshots. */
public enum SnapshotAvailability {
    AVAILABLE,

    SOFT_FAILURE,

    HARD_FAILURE;

    // ------------------------------------------------------------------------

    private final String message;

    SnapshotAvailability() {
        message = null;
    }

    SnapshotAvailability(String message) {
        this.message = message;
    }

    public String getMessage() {
        return message;
    }
}


Proposed Changes

Original Checkpoint Flow

Originally, the checkpoint is triggered with the following process:

  • JobMaster
    1. The CheckpointCoordinator would ask the OperatorCoordinator to take a snapshot firstly.
    2. Trigger the source tasks to do a checkpoint.
  • TaskManager (two main things)
    1. Broadcast the barrier downstream.
    2. Ask the operators to take snapshots.

Adding Preliminary Review

Under some scenarios, an operator is not allowed to take a checkpoint when triggered. Therefore, we should add one more step before taking a snapshot, which is examining the state of the operator.

After checking the state of the operator and finding that it is unable to take snapshots, there are two options.

Option 1: Decline Checkpoint

A simple way to deal with it is to decline the checkpoint. So we can simplify the modification as below.

On JobMaster:

  1. take snapshots of OperatorCoordinator
  2. trigger checkpoint on TaskManager
  3. handle decline from TaskManager

On TaskManager:

  1. Broadcast barrier downstream
  2. check the state of all operators
    • If all of them can take a snapshot
      • Take snapshots
    • If any of them cannot take a snapshot
      • Decline checkpoint and report it to JobMaster

Pros:

  • An intuitive way of implementing checkpoint rejection.
  • The implementation is on the operator level, every operator has the right of rejecting checkpoint if necessary.

Cons:

  • Checkpoints might fail for a long time. Since the operator might reject checkpoint for a long time.
  • Rejection from the operator should be viewed differently from the currently exisiting failure reasons. (If properly handled, it should not be a disadvantage.)

Option 2: Holding barrier until the checkpoint is available

Instead of inserting a barrier immediately when a checkpoint is triggered, the barrier wouldn't be inserted until the operator is ready.

Therefore, the modified workflow is,

On JobMaster:

  1. Shut the event valve & take snapshots of OperatorCoordinators
  2. Trigger checkpoint on TaskManager
  3. Wait for response from TaskManager that the barrier inserted successfully/checkpoint expiration
  4. Open the event valve

On TaskManager (actually only SourceTasks are modified):

  • Check the state of SourceOperator
    • If snapshot is available 
      • Take snapshot & broadcast barrier
      • Notify JobMaster
    • Otherwise
      • Hold barrier until the operator is ready

Pro:

  • A checkpoint is likely to succeed.

Cons:

  • The event valve has to be shut for a possibly long time to maintain the consistency between JobMaster and TaskManager.
  • The implementation can only be applied on SourceOperator only.

Conclusion

After weighing the pros and cons, the first option looks more promising. Because some possible negative results could be caused by shutting the event valve for a long time, e.g., workload unbalance.

However, we now need to figure out a proper way to handle the soft failure, which will be discussed in the next topic.

Soft Failure Handling Mechanism

Traditionally, a checkpoint is expected to succeed definitely. Therefore, any unexpected reason that caused the failure of the checkpoint could be viewed as a hard failure, such as checkpoint expiration. The hard failure has a sophisticated handling mechanism[2], which is not discussed in this FLIP. However, the introduction of checkpoint rejection also introduces soft failures, a new failure reason.

The definitions of hard and soft failure are different,

  • A hard failure reported when a checkpoint that should have succeeded fails.
  • A soft failure reported when a checkpoint might be failed/rejected but it is predictable/acceptable.

The soft failure should not be viewed as a hard failure since it is not an unpredictable failure for the operator. However, since the operator has only the local view of itself, it is possible that it keeps reporting soft failure to the coordinator.

Therefore, in order to handle endless soft failures and also no checkpoint completes in a very long time, a global tolerable continuous checkpoint failures timeout is introduced. When the timeout is hit, the job would trigger the current failover mechanism, which is to recover from the last checkpoint. 

A new configuration is introduced for the checkpoint.


CheckpointFailureManager
public static final long UNLIMITED_TOLERABLE_FAILURE_TIMEOUT = Long.MAX_VALUE;
CheckpointConfig
/**
 * The tolerable time range of continuous checkpoint failures. If no checkpoint succeeds within this time
 * range, the job would trigger failover. The default behavior is to allow a infinity time of checkpoint
 * failure.
 */
private long tolerableCheckpointFailureTimeout = UNLIMITED_TOLERABLE_FAILURE_TIMEOUT;

Local Continuous Soft Failure Tolerance

From the local view of an operator itself, it can also take care of its continuous soft failures. Since the JobMaster has only the global view of whether the checkpoint is succeeded, but has no idea whether the rejection from the operator is out of expectation of the operator itself. Therefore, the operator should take the responsibility of returning a proper failure reason.

When the checkpoint rejection is out of the tolerable continuous soft failures, e.g., timeout, then the operator could choose to return a hard failure to the coordinator.

Conclusion

Overall, the operator can return either a soft failure or a hard failure. The JobMaster would count the hard failure but ignore the soft failure. And the job would failover under one of the following two scenarios:

  • The number of continuous hard failures is out of tolerable numbers.
  • No checkpoint succeeds within a tolerable time range.

A timer would therefore be set to keep tracking of continuous checkpoint failures, including soft failure. The timer would be reset when a checkpoint is completed. But if no timeout is configured, no timer would be set.

Detailed Implementation on current Source API

Follow image shows the current checkpoint mechanism of taking snapshots for operators.

When the coordinator ask the operator to take a snapshot, the operator would then ask its components (if exist) to take snapshots. Therefore, the checkpoint rejection mechanism should follow the current checkpoint workflow.


With the introduction of above new interfaces, the SourceOperator and SourceReader could be modified to address the problem of CDC proposed above.


SourceOperator
public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStreamOperator<OUT>
        implements OperatorEventHandler,
                PushingAsyncDataInput<OUT>,
                CheckpointAvailabilityProvider {
	@Override
    public SnapshotAvailability isSnapshotAvailable(long checkpointID) {
        LOG.debug(
                "Checking availability of taking the snapshot of {} for SourceOperator.",
                checkpointID);
        return sourceReader.isSnapshotAvailable(checkpointID);
    }

}
SourceReader
@PublicEvolving
public interface SourceReader<T, SplitT extends SourceSplit>
        extends AutoCloseable, CheckpointListener, CheckpointAvailabilityProvider {
	/**
     * This method has a default implementation of returning AVAILABLE, which means that
     * the default behavior is to take a snapshot under any scenario.
     */
    @Override
    default SnapshotAvailability isSnapshotAvailable(long checkpointID) {
        return SnapshotAvailability.AVAILABLE;
    }
}

References

[1] Add rejecting checkpoint logic in source, https://issues.apache.org/jira/browse/FLINK-18578

[2] Checkpoint Failure process improvement, https://docs.google.com/document/d/1ce7RtecuTxcVUJlnU44hzcO2Dwq9g4Oyd8_biy94hJc/edit#

  • No labels