...
In order to support the reactive mode (FLIP-159) we need a different type of scheduler which first announces the required resources and only after having received the resources decides on the actual parallelism with which to execute the job. This has the benefit that this scheduler can schedule jobs if not all required resources are fulfilled. Moreover, it allows to continue executing jobs even after TaskManagers
have been lost. The declarative scheduler builds upon the declarative resource management (FLIP-138).
Proposed Changes
The declarative scheduler will first work for streaming jobs only. This will simplify things considerably because we always have to schedule all operators. Moreover, by treating every failure as a global failover which restarts the whole topology, we can further simplify the scheduler. This failover behaviour is the default for many streaming topologies anyways if they don't consist of disjunct graphs. Given these assumptions we want to develop the following scheduler:
...
If the system cannot recover from a local failover because it does not have enough slots available, it must be escalated which makes it a global failover. A global failover will allow the system to rescale the whole job.
State machine of the scheduler
Given the description above we propose the following state machine to model the behaviour of the declarative scheduler:
PlantUML |
---|
@startuml
scale 600 width
[*] -> State1
State1 --> State2 : Succeeded
State1 --> [*] : Aborted
State2 --> State3 : Succeeded
State2 --> [*] : Aborted
state State3 {
state "Accumulate Enough Data\nLong State Name" as long1
long1 : Just a test
[*] --> long1
long1 --> long1 : New Data
long1 --> ProcessData : Enough Data
}
State3 --> State3 : Failed
State3 --> [*] : Succeeded / Save Result
State3 --> [*] : Aborted
@enduml |
Components of the scheduler
SlotAllocator
ExecutionFailureHandler
RescalingStrategy
How to distinguish streaming jobs
Compatibility, Deprecation, and Migration Plan
The declarative scheduler will be a beta feature which the user has to activate explicitly by setting the config option jobmanager.scheduler: declarative
. It will only be chosen if the user submitted a streaming job.
Limitations
- The declarative scheduler runs with streaming jobs only
- No support for local recovery
- No support for local failovers
- No integration with Flink's web UI
Test Plan
The new scheduler needs extensive unit, IT and end-to-end testing because it is a crucial component which is at the heart of Flink.