Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Key

Type

Default Value

Description

execution.batch.job-recovery.enabled

Boolean

false

A flag to enable or disable the job recovery

execution.batch.job-recovery.previous-worker.recovery.timeout

Duration

30 s

The timeout for a new job master to wait for the previous worker to reconnect

execution.batch.job-recovery.snapshot.min-pause

Duration

3 min

The minimal pause between operator coordinator snapshotssnapshots. To avoid performance issues caused by too frequent snapshots. Used to take snapshots for operator coordinator or other components.

job-event.store.write-buffer.size

MemorySize

1M

The size of the write buffer of JobEventStore, the content will be flushed to external file system once it's full

job-event.store.write-buffer.flush-interval

Duration

1 s

The flush interval of write buffer of JobEventStore, over this time, the content will be flushed to external file system

...

In order to support Job Recovery, we will reuse the SplitEnumerator#snapshotState method to obtain the internal state of the split enumerator. In order to distinguish it from a normal checkpoint, we will pass -1(NO_CHECKPOINT) to identify indicate that this is a snapshot for the no-checkpoint/batch scenarios. Some current split enumerator implementations may rely on an postive checkpoint id, or does not support taking snapshot in no-checkpoint/batch scenarios. Therefore, job recovery will be disabled unless the source developer explicitly indicates that it can support taking snapshot snapshots in no - checkpoint/batch scenarios. We introduced , the state of SourceCoordinator/SplitEnumerator will not be recored and restored(see the JobEvent section for how to handle this case). We will introduce a new interface SupportsBatchSnapshot to do this.indicate whether the split enumerator supports  taking snapshots in batch scenarios. 

Code Block
titleSupportsBatchSnapshot
/**
 * An identifier interface for {@link SplitEnumerator}, which is used to support taking snapshot in
 * no-checkpoint/batch scenarios (for example to support job recovery in batch jobs). Once a {@link
 * SplitEnumerator} implements this interface, it'sits {@link SplitEnumerator#snapshotState} method
 * needs to accept -1 (NO_CHECKPOINT) as the argument.
 */
@PublicEvolving
public interface SupportsBatchSnapshot {}

...