1. Motivation

There are several points in the restart strategy that need to be improved, the aim of this FLIP is to improve the exponential-delay restart strategy.

1.1 The exponential-delay doesn't have the max attempts mechanism

Currently, Flink has 3 restart strategies, they are: fixed-delay, failure-rate and exponential-delay.

The exponential-delay is suitable if a job continues to fail for a period of time. The fixed-delay and failure-rate has the max attempts mechanism, that means, the job won't restart and go to fail after the attempt exceeds the threshold of max attempts.

The max attempts mechanism is reasonable, flink should not or need to infinitely restart the job if the job keeps failing. In our experience, some scenarios do not want jobs to restart indefinitely:

  • Test jobs keep failing, but the owner easily ignores them. Infinite restarts will bring great difficulty to platform management.
  • The reason why some production jobs fail may be due to configuration issues.
    • The current job uses old configurations, so it keeps failing. Users must start a new Application in yarn or k8s and load the latest configuration to restore.
    • At this point, infinite in-place restarts of the job are meaningless.
    • When the Flink platform starts a new App, it may load the latest configuration. So external restart is needed here.

However, the exponential-delay doesn't have the max attempts mechanism. I propose introducing the max attempts for Exponential Delay Restart Strategy.

1.2 Different semantics of restart attempts cause region failover not as expected

When a job only has one task(JobVertex), and the parallelism of this task is 100. It means this flink job has 100 regions. These tasks consume data from kafka, and the Kafka cluster has a network failure and is recovered within a short period of time.

When there is a network failure in the kafka cluster, all flink tasks will throw exceptions (such as KafkaTimeoutException). We expect that the Flink job can be recovered through the restart-strategy mechanism after the Kafka cluster is recovered. However, the semantic of restart attempts is different between region-failover and full-failover.

  • For full-failover, all subtasks are in a region. Once a subtask fails, the whole job is failed. So the restartAttempt is just +1 even if all subtasks fail.
  • For region-failover, when 100 subtasks(regions) fail, the restartAttempt will be +100. 

The +100 will cause 2 problems:

  • In general, the restartAttempt will be a small value to prevent the job restart infinitely.
  • For the exponential-delay, the delay time will be increased rapidly.

I propose fixing the restartAttempt semantic for region-failover.

Note: Batch job is similar to region-failover(Each jobVertex is a region for batch job),  the restartAttempt of batch job will also grow very fast.

1.3 Discuss whether the exponential-delay can be considered as the default restart-strategy

If checkpointing is enabled, the default value is fixed-delay with Integer.MAX_VALUE restart attempts and '1 s' delay[1]. It means the job will restart infinitely with high frequency when a job continues to fail.

When the Kafka cluster fails, a large number of flink jobs will be restarted frequently. After the kafka cluster is recovered, a large number of high-frequency restarts of flink jobs may cause the kafka cluster to avalanche again.

I propose considering the exponential-delay as the default strategy with a couple of reasons:

  • The exponential-delay can reduce the restart frequency when a job continues to fail.
  • It can restart a job quickly when a job fails occasionally.
  • The restart-strategy.exponential-delay.jitter-factor can avoid restarting multiple jobs at the same time. It’s useful to prevent avalanches.

2. Public interfaces

2.1 Introduce the max attempts for Exponential Delay Restart Strategy

I propose introducing the `restart-strategy.exponential-delay.attempts-before-reset-backoff` to support the max attempts mechanism for exponential-delay. It means flink won't restart the job if the number of job failures before reset exceeds attempts-before-reset-backoff when exponential-delay is enabled.

The default value of `restart-strategy.exponential-delay.attempts-before-reset-backoff` is Integer.MAX_VALUE.

2.2 Improve restartAttempt's counting strategy

About improving the restartAttempt’s counting strategy, it doesn't change any public interface or option.

2.3 Consider the exponential-delay  as the default restart-strategy

If the exponential-delay is used as default restart-strategy makes sense, all default options of exponential-delay need to be discussed.

I prefer the new default restart-strategy to be as similar as possible to the current default restart-strategy. It will restart the job infinitely and it can restart a job quickly when a job fails occasionally.

Option name

Default value

restart-strategy.exponential-delay.attempts-before-reset-backoff

Integer.MAX_VALUE

restart-strategy.exponential-delay.initial-backoff

1s

restart-strategy.exponential-delay.backoff-multiplier

1.5

restart-strategy.exponential-delay.jitter-factor

0.1

restart-strategy.exponential-delay.max-backoff

1 min

restart-strategy.exponential-delay.reset-backoff-threshold

1h


Following is the relationship between restart-attempts and retry-delay-time when multiplier is 1.2 and 1.5:


backoff-multiplier = 1.5 achieves the following goals:

  • When restarts are infrequent in a short period of time, flink can quickly restart the job. (For example: the retry delay time when restarting 5 times is 5.0625s)
  • When restarting frequently in a short period of time, flink can slightly reduce the restart frequency to prevent avalanches. (For example: the retry delay time when retrying 10 times is about 25s, which is not very large.)


Note: after discussion in the dev mail list, we will change the the default value of these options as well.


2.4 org.apache.flink.api.common.restartstrategy.RestartStrategies

From the FLIP-381, the org.apache.flink.api.common.restartstrategy.RestartStrategies may be removed in 1.19, and users are advised to use configuration options, so FLIP ignores updating it for now.


3. Proposed changes

3.1 Introduce the max attempts for Exponential Delay Restart Strategy

Support the  `restart-strategy.exponential-delay.attempts-before-reset-backoff` in the ExponentialDelayRestartBackoffTimeStrategy.

3.2 Improve restartAttempt's counting strategy

The core idea is that we only increase restartAttempt once before restarting the job next time(Or merging multiple exceptions into one restartAttempt).

Take the fixed-delay as an example:

  • restart-strategy.exponential-delay.initial-backoff=1s
  • restart-strategy.exponential-delay.backoff-multiplier=2


  • 09:20:01:000: one task throws exception, and delay time is 1s, it will be restarted at 09:20:02:000
    • It means all tasks that throw exceptions between 01:000 and 02:000 are restarted at 02:000
  • 09:20:04:000: one task throws exception, and delay time is 2s (=1s * 2), , it will be restarted at 09:20:06:000
    • All tasks that throw exceptions between 04:000 and 06:000 are restarted at 06:000
  • 09:20:07:000: one task throws exception, and delay time is 4s (=1s * 2 * 2), , it will be restarted at 09:20:11:000
    • All tasks that throw exceptions between 07:000 and 11:000 are restarted at 11:000

3.3 Consider the exponential-delay  as the default restart-strategy

Updating the default restart-strategy in the  org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategyFactoryLoader#getDefaultRestartStrategyFactory.

And update these default values:

Option name

Default value

restart-strategy.exponential-delay.backoff-multiplier

1.2

restart-strategy.exponential-delay.max-backoff

1 min


4. Supplement: Why is the restartAttempts increased raplidly

From the discussion mailing list, it's needed to explain why the restartAttempts is increased raplidly in this FLIP. Everyone can learn about it through FLIP.

4.1 Example to reproduce the restartAttempts is increased raplidly

If all tasks of one job throw an exception directly and have the following options, we expect that this job can be retried 5 times with delay 10s each time, and the Flink job will fail after 50s:

  • restart-strategy : fixed-delay

  • restart-strategy.fixed-delay.attempts : 5

  • restart-strategy.fixed-delay.delay : 10 s


Following is a simple flink demo, and it run on 3 cases:

  • case1 is default(streaming job with region failover): Job execution failed immediately (It's unexpected)
  • case2 is streaming job with full failover: Job execution failed after 50s (5 * 10s) (it's expected)
  • case3 is batch job with region failover: Job execution failed immediately (It's unexpected)


import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

/**
 * Test for the restartAttempts is increased rapidly.
 */
public class RestartAttemptsIncreaseRapidlyDemo {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString("restart-strategy", "fixed-delay");
        conf.setString("restart-strategy.fixed-delay.attempts", "5");
        conf.setString("restart-strategy.fixed-delay.delay", "10 s");

        // 1. case1 is default(streaming job with region failover): Job execution failed immediately

        // 2. case2 is streaming job with full failover: Job execution failed after 50s (5 * 10s)
//        conf.setString("jobmanager.execution.failover-strategy", "full");

        // 3. case3 is batch job with region failover: Job execution failed after 50s (5 * 10s)
        // Note: In order to check the restartAttempts is increased rapidly, we should use DefaultScheduler.
        // Because default scheduler is AdaptiveBatchScheduler for batch job. It will use parallelism=1 for this job,
        // 1 region cannot reproduce this issue.
//        conf.setString("execution.runtime-mode", "BATCH");
//        conf.setString("jobmanager.scheduler", "Default");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(6);

        DataGeneratorSource<Long> generatorSource =
                new DataGeneratorSource<>(
                        value -> value,
                        3000,
                        RateLimiterStrategy.perSecond(100),
                        Types.LONG);

        env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "Data Generator")
                .addSink(new SinkFunction<Long>() {
                    @Override
                    public void invoke(Long value, Context context) {
                        throw new RuntimeException("Expected exception.");
                    }
                })
                .name("MySink");

        env.execute(RestartAttemptsIncreaseRapidlyDemo.class.getSimpleName());
    }
}



4.2 How does the restartAttempts work?

Note: Only fixed-delay is explained, other strategies are similar.

4.2.1 When the currentRestartAttempt++?

When one task fails, it will call RestartBackoffTimeStrategy#notifyFailure, and FixedDelayRestartBackoffTimeStrategy will call currentRestartAttempt++.

Following is the call stack, it's useful to follow up related code.

"flink-pekko.actor.default-dispatcher-4@3876" prio=5 tid=0x1e nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at org.apache.flink.runtime.executiongraph.failover.flip1.FixedDelayRestartBackoffTimeStrategy.notifyFailure(FixedDelayRestartBackoffTimeStrategy.java:69)
	  at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:163)
	  at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
	  at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285)
	  at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276)
	  at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269)
	  at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764)
	  at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741)
	  at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
	  at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)


4.2.2 Why does full-failover work well?

All tasks are one region:

When the first task fails, JobMaster will mark rest of the tasks as ExecutionState#CANCELING.

When rest tasks report exceptions, they will switch state from CANCELING to CANCELED instead of FAILED. So rest tasks won't call failed logic(Don't increase the currentRestartAttempt).

Why doesn't region-failover work well?

If one job has 100 regions, and every region just has one task. If one task fails, JobMaster doesn't update other tasks.

If rest tasks report exceptions, they will increase the currentRestartAttempt.


4.2.3 Detailed code for full-failover

When one task fails, rest task are marked to CANCELINIG:

"flink-pekko.actor.default-dispatcher-9@6478" prio=5 tid=0x43 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at org.apache.flink.runtime.executiongraph.Execution.startCancelling(Execution.java:1002)
	  at org.apache.flink.runtime.executiongraph.Execution.cancel(Execution.java:651)
	  at org.apache.flink.runtime.scheduler.DefaultExecutionOperations.cancel(DefaultExecutionOperations.java:37)
	  at org.apache.flink.runtime.scheduler.DefaultScheduler.cancelExecution(DefaultScheduler.java:450)
	  at org.apache.flink.runtime.scheduler.DefaultScheduler$$Lambda$1519.323386661.apply(Unknown Source:-1)
	  at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	  at java.util.Collections$2.tryAdvance(Collections.java:4717)
	  at java.util.Collections$2.forEachRemaining(Collections.java:4725)
	  at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	  at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	  at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
	  at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	  at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
	  at org.apache.flink.runtime.scheduler.DefaultScheduler.cancelExecutionVertex(DefaultScheduler.java:445)
	  at org.apache.flink.runtime.scheduler.DefaultScheduler$$Lambda$1518.164545304.apply(Unknown Source:-1)
	  at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	  at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
	  at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	  at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	  at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
	  at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	  at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
	  at org.apache.flink.runtime.scheduler.DefaultScheduler.cancelTasksAsync(DefaultScheduler.java:436)
	  at org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasksWithDelay(DefaultScheduler.java:368)
	  at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeRestartTasks(DefaultScheduler.java:334)
	  at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276)
	  at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269)
	  at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764)
	  at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741)
	  at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
	  at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)


When rest tasks reports exception, the state updated from CANCELINIG to CANCELED:

"flink-pekko.actor.default-dispatcher-8@6482" prio=5 tid=0x42 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at org.apache.flink.runtime.executiongraph.Execution.completeCancelling(Execution.java:1039)
	  at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1138)
	  at org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:933)
	  at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.updateStateInternal(DefaultExecutionGraph.java:1405)
	  at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.updateState(DefaultExecutionGraph.java:1364)
	  at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:740)
	  at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
	  at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)


When rest tasks reports exception, they will doesn't call FAILED logic:


5. Compatibility, Deprecation, and Migration Plan

Nothing to be compatible with.

6. Test Plan

UT & IT & Manually verify that the restart-strategy is working as expected.

7. Rejected Alternatives




[1] https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#restart-strategy-type


  • No labels