Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, when Flink’s job manager crashes  when Flink’s job manager crashes or gets killed, possibly due to unexpected errors or planned nodes decommission, it will cause the following two situations:

  1. Failed, if the job does not enable HA.
  2. Restart, if the job enable HA. If it’s a streaming job, the job will be resumed from the last successful checkpoint. If it’s a batch job, it have to run from beginning, all previous progress will be lost.

In view of this, we think the JM crash may cause great regression for batch jobs, especially long running batch jobs. This FLIP is mainly to solve this problem so that batch jobs can recover most job progress after JM crashes. In this FLIP, our goal is to let most finished tasks not need to be re-run.

Public Interfaces

We intend to introduce the following new configuration parameters.

Key

Type

Default Value

Description

execution.batch.job-recovery.enabled

Boolean

false

A flag to enable or disable the job recovery

execution.batch.job-recovery.previous-worker.recovery.timeout

Duration

30 s

The timeout for a new job master to wait for the previous worker to reconnect

execution.batch.job-recovery.operator-coordinator-snapshot.min-pause

Duration

3 min

The minimal pause between operator coordinator snapshots

job-event.store.write-buffer.size

MemorySize

1M

The size of the write buffer of JobEventStore, the content will be flushed to external file system once it's full

job-event.store.write-buffer.flush-interval

Duration

1 s

The flush interval of write buffer of JobEventStore, over this time, the content will be flushed to external file system

Proposed Changes

The Whole Workflow of Job Recovery

...

In order to obtain the state of operator coordinators, we will enrich the checkpointCoordinatormethod to let it accept  -1 method to let it accept  -1 (NO_CHECKPOINT)  as the value of  as the value of checkpointId, to support snapshotting the state of operator coordinator in batch jobs. After JM crashes, the operator coordinator can be restored from the previous recorded state. In addition to a simple restore(by resetToCheckpoint  methodmethod),  it also needs to call subtaskReset for the non-finished tasks (which may in running state before JM crashes) , because these tasks will be reset and re-run after JM crashesit also needs to call subtaskReset for the non-finished tasks (which may in running state before JM crashes) , because these tasks will be reset and re-run after JM crashes.

Consider that the operator coordinators may have large state, we believe it may cause large overhead if we snapshot the operator coordinator at each execution vertex finished. To solve this problem, we will add a new configuration option "execution.batch.job-recovery.operator-coordinator-snapshot.min-pause" to control the minimum interval between snapshots. When restoring, we will also reconcile the execution job vertex state with the operator coordinator state to be consistent. In other words, we will adjust the execution job vertex to the state at the time of the lastest snapshot operator coordinators.

...