Authors: Yu Li, CongXian Qiu

Status

Discussion threadhttps://lists.apache.org/thread/1r2opowofh7phwf2c6bhtxjcpwctwy6t
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release
ReasonIncompatible with FLIP-193

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently with our released versions [1], there are mainly two ways to finish a job: stop and cancel, and the difference between them are as follows:

  • On a cancel call, the operators in a job immediately receive a cancel() method call to cancel them as soon as possible. If operators are not stopping after the cancel call, Flink will start interrupting the thread periodically until it stops.
  • A “stop” call is a more graceful way of stopping a running streaming job. Stop is only available for jobs which use sources that implement the StoppableFunction interface. When the user requests to stop a job, all sources will receive a stop() method call. The job will keep running until all sources properly shut down. This allows the job to finish processing all inflight data.

However, for stateful operators with retained checkpointing, the stop call won’t take any checkpoint, thus when resuming the job it needs to recover from the latest checkpoint with source rewinding, which causes the wait for processing all inflight data meaningless (all need to be processed again). In another word, there’s no real difference between stop and cancel in this case, thus having conceptual ambiguity.

On the other hand, in latest master branch after FLIP-34, job stop will always be accompanied by a savepoint, which has below problems:

  • It's an unexpected behavior change from user perspective, that old stop command will fail on jobs w/o savepoint configuration.
  • It slows down the job stop procedure and might block launching new jobs when resource is contended.

This document targets at reinforcing the semantic of job stopping, adding back the normal stop (w/o savepoint) support as well as preventing unnecessary source rewinding when retained checkpoint is enabled. To achieve these goals, we will firstly discuss about the conceptual difference between job stop and cancel, and respectively that between checkpoint and savepoint, analogizing to the concepts in database systems. Then we will describe how to reinforce the job stop semantic and how to implement it.

Current Status

In this paragraph let’s align the concepts in Flink to those of the mature database system, and see what's missing in existing job stop process.

Checkpoint 

In database (take MySQL for example) there’s also a checkpoint concept [2]. Along with data ingestion, changes are made to data pages which are cached in the buffer pool and written to the data files some time later by a process known as flushing. The checkpoint is a record of the latest changes that have been successfully flushed.

There’re two types of checkpoint: fuzzy [3] and sharp [4], and with InnoDB storage engine, both types will be executed [5].

  • During normal operation, it performs fuzzy checkpoints, just like the normal checkpointing process in Flink.
  • When the database shuts down, it performs a sharp checkpoint, which is missing in current Flink job stopping process.

Please note that checkpoint in both Flink and database are system-controlled process.

Cancel, Stop and Failover

We could map the Flink job cancel and stop command to database kill and normal-shutdown, and the Flink job failover process to database crash-and-recover.

Unlike database will always keep its data files for persistence (regardless of kill or shutdown), Flink will only keep the checkpoint data for the retained type.

Flink job with normal checkpoints:

  • Job failover restarts from the latest checkpoint and rewinds source, just like database crash recovers from the last fuzzy checkpoint and replays redo log.

Flink job with retained checkpoints:

  • Resuming a Flink job after cancellation involves source rewinding, just like database resuming from killing needs a redo log replay.
  • Resuming a Flink job after stop should be able to load from the latest retained checkpoint without any source rewinding, just like no redo log replay needed for database resuming from normal shutdown, which is missing in current implementation.

Savepoint

Flink savepoint concept could be analogized to database backups since the use-case of savepoints in Flink [6] includes:

  • Recover from failure
  • Refresh job environment
    • Rescale or change job graph
    • Upgrade and state migration
  • Switch state backend
  • Import and export (FLIP-43)
  • Fork job for blue/red deployment

Relatively, database uses backups [7] to:

  • Recover from accidental deletes
  • Refresh development environments
  • Migrate databases or switch storage engine [8]
  • Import and export data
  • Create database copies for testing, training and demonstrations

In Flink we have long supported cancel with savepoint, and recently in FLIP-34 we have implemented stop with savepoint, both could be mapped to automatically triggering a backup before killing/shutting down the database instance, and completely orthogonal with the fuzzy/sharp checkpoint process.

Conclusion

According to all above analogies, it's clear that we should always (automatically) do a checkpoint (with retained checkpoint configured) when stopping job, just like we (automatically) do a sharp checkpoint during database shutdown. And this is a necessary supplement to reinforce the job stop semantic.

Proposed Changes

After changes for this FLIP, we will make the normal stop command (w/o "-s" option) work again, with semantic enhancement:

  • If retained checkpoint is enabled, we should always do a checkpoint when stopping job.
  • If retained checkpoint is disabled, we fallback to the old stop process before FLIP-34, which will finish the source task and allow the job to finish processing all inflight data.

It also makes sense if user issues another cancel command for quick job termination when observing the stop process got stuck, similar to killing the database instance if don’t want to wait for the normal shutdown. And we should make sure the after-stop cancel command could take effect.

Note that currently user controls the life cycle of the retained checkpoint files, and restoring from retained checkpoint reuses the “flink run -s” command thus calling CheckpointCoordinator.restoreSavepoint, so it’s totally fine to restore from a retained checkpoint for multiple times or jobs if only users don’t delete it.

Implementation

After FLIP-34 we have introduced two different types for job stop:

TypeSource OPSTask StatusJob Status
SUSPEND

Checkpoint Barrier,

End Of Stream

FinishedFinished
TERMINATE

MAX_WATERMARK, Checkpoint Barrier,

End Of Stream

FinishedFinished

Based on this, we need below implementations to support performing a checkpoint when stopping the job (with retained checkpoint configured):

  1. The Job Manager triggers a synchronous checkpoint at the source, that also indicates one of TERMINATE or SUSPEND
  2. Sources send a MAX_WATERMARK in case of TERMINATE, nothing is done in case of SUSPEND
  3. The Task Manager executes the checkpoint in a SYNCHRONOUS way, i.e. it blocks until the state is persisted successfully and the notifyCheckpointComplete() is executed.
  4. The Task Manager acknowledges the successful persistence of the state for the checkpoint
  5. The Job Manager sends the notification that the checkpoint is completed
  6. The Task Manger unblock the synchronous checkpoint execution.
  7. Finishing the job progress from the sources, i.e. they shut down and EOS message propagate through the job.
  8. The Job Manager waits until the job state goes to FINISHED before declaring the operation successful.

More details please refer to PR#8617.

We may also need to refactor the entire stop-processing framework with mailbox model (FLINK-12477) as the next step.

Limitations

For some special cases such as iteration (which will never end by itself) stop won’t work and user has to use cancel for job termination and bear with source rewinding.

Open Questions

  1. What’s the concept in Flink relative to database snapshots? Shall we introduce one?
    • It may share the checkpoint format and allow difference between backends, but should be in different concept (not checkpoint or backup as in database)
    • It should share the difference between database snapshot and backup [9]
  2. TBC

References

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html

[2] https://dev.mysql.com/doc/refman/8.0/en/glossary.html#glos_checkpoint

[3] https://dev.mysql.com/doc/refman/8.0/en/glossary.html#glos_fuzzy_checkpointing

[4] https://dev.mysql.com/doc/refman/8.0/en/glossary.html#glos_sharp_checkpoint

[5] https://www.xaprb.com/blog/2011/01/29/how-innodb-performs-a-checkpoint/

[6] https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/savepoints.html

[7] https://www.sqlshack.com/backup-and-restore-or-recovery-strategies-for-sql-server-database

[8] https://github.com/facebook/mysql-5.6/wiki/Migrating-from-InnoDB-to-RocksDB

[9] https://www.oracle.com/technetwork/database/availability/rman-fra-snapshot-322251.html

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.