...
Key | Type | Default Value | Description |
execution.batch.job-recovery.enabled | Boolean | false | A flag to enable or disable the job recovery |
execution.batch.job-recovery.previous-worker.recovery.timeout | Duration | 30 s | The timeout for a new job master to wait for the previous worker to reconnect |
execution.batch.job-recovery.snapshot.min-pause | Duration | 3 min | The minimal pause between operator coordinator snapshotssnapshots. To avoid performance issues caused by too frequent snapshots. Used to take snapshots for operator coordinator or other components. |
job-event.store.write-buffer.size | MemorySize | 1M | The size of the write buffer of JobEventStore, the content will be flushed to external file system once it's full |
job-event.store.write-buffer.flush-interval | Duration | 1 s | The flush interval of write buffer of JobEventStore, over this time, the content will be flushed to external file system |
...
In order to support Job Recovery, we will reuse the SplitEnumerator#snapshotState method to obtain the internal state of the split enumerator. In order to distinguish it from a normal checkpoint, we will pass -1(NO_CHECKPOINT) to identify indicate that this is a snapshot for the no-checkpoint/batch scenarios. Some current split enumerator implementations may rely on an postive checkpoint id, or does not support taking snapshot in no-checkpoint/batch scenarios. Therefore, job recovery will be disabled unless the source developer explicitly indicates that it can support taking snapshot snapshots in no - checkpoint/batch scenarios. We introduced , the state of SourceCoordinator/SplitEnumerator will not be recored and restored(see the JobEvent section for how to handle this case). We will introduce a new interface SupportsBatchSnapshot to do this.indicate whether the split enumerator supports taking snapshots in batch scenarios.
Code Block | ||
---|---|---|
| ||
/** * An identifier interface for {@link SplitEnumerator}, which is used to support taking snapshot in * no-checkpoint/batch scenarios (for example to support job recovery in batch jobs). Once a {@link * SplitEnumerator} implements this interface, it'sits {@link SplitEnumerator#snapshotState} method * needs to accept -1 (NO_CHECKPOINT) as the argument. */ @PublicEvolving public interface SupportsBatchSnapshot {} |
...