Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current stateUnder Discussion

Discussion thread

JIRA

...

Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-4484

Release1.2


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

With this FLIP, I propose to allow to unify checkpoints and savepoints by allowing savepoints to be triggered automatically.

Proposed Changes 

OperationManual Savepoint
Flink 1.1 
Automatic with Checkpoints
FLIP-10 
Periodic Savepoints
FLIP-10 
Trigger SavepointManualAutomaticAutomatic
Dispose SavepointManualAutomaticManual

...

Persistent Checkpoints

The checkpoint coordinator has a fixed-size FIFO queue of completed checkpoints that are retained (current default size is 1). Checkpoints are discarded when they are removed from this queue. I propose to store allow persisting these checkpoints as like savepoints. This means that if a job fails permanently the user will have a savepoint checkpoint available to restore from. When a newer checkpoint completes, old savepoints will be automatically discardedThese can be subsumed like regular checkpoints..

As an example think of the following scenario: a job runs smoothly until it hits a bad record that it cannot handle. The current behaviour will be that the job will try to recover, but it will hit the bad record again and keep on failing. With the proposed change, some recent checkpoint is stored as a savepoint and the user can update the program to handle bad records and restore from the savepoint.

...

Allow the user to configure periodic triggering of savepoints. The behaviour should be the same as similar to manually triggering a savepoint every X seconds. These savepoints are not discarded automatically.savepoints periodically. Furthermore, we bound the number of available periodic savepoints. On shut down, they are never removed.

What's the Difference?

Although persistent checkpoints and periodic savepoints look similar, persistent checkpoints have a major difference: they can only be recovered with the same job and Flink version whereas savepoints allow to modify both the job and Flink version. 

Miscellaneous
Always Create CheckpointCoordinator

Currently, the checkpoint coordinator is only created if checkpointing is enabled. This means that it is not possible to trigger a savepoint for stateful jobs that don't have periodic checkpointing enabled.

...

Furthermore, there is the option to specify the directory ad-hoc per savepoint (via CheckpointConfig or the CLI, see below). This ad-hoc value has precedence over the default value. If no savepoint path is configured (no default and no ad-hoc), the savepoint will fail.

...

Default Directory

Configured?

...

...

Extend CheckpointConfig

Code Block
// Stores completed checkpoints as savepoints
@PublicEvolving
void enableSavepoints();


@PublicEvolving
void enableSavepointsenablePersistentCheckpoints(String savepointPathpath);


@PublicEvolving
void disableSavepointsdisablePersistentCheckpoints();

// Enables periodic savepoints
@PublicEvolving
void enablePeriodicSavepoints(long savepointIntervalinterval, TimeUnit unit, String path):


@PublicEvolving
void enablePeriodicSavepoints(long savepointInterval, TimeUnit unit, String savepointPath):

disablePeriodicSavepoints();
 
// Sets the maximum number of retained periodic savepoints
@PublicEvolving
void disablePeriodicSavepointssetMaximumNumberOfRetainedPeriodicSavepoints(int max);

Furthermore we can extend the StreamExecutionEnvironment with shortcuts:

Code Block
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableSavepoints();
env.enableSavepoints(enablePersistentCheckpoints(String savepointPathpath);
env.disableSavepointsdisablePersistentCheckpoints();

Add Optional Savepoint Path Argument to CLI

...

Code Block
bin/flink savepoint <JobID> [<SavepointPath<Path>]

Add REST API and Logging

...