Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In order to support automatic scaling, we start ask whenever we enter the Executing state , a ScalingPolicy which checks periodically or whenever  whenever new slots arrive whether the system can scale up. If this is the case, then it transitions into the Restarting  state which triggers a global failover and a restart which will make use of the available resources.

...

PlantUML
@startuml

package "Declarative Scheduler" {
  [SlotAllocator]
  [ExecutionFailureHandlerFailureHandler]
  [ScalingPolicy]
}

@enduml

...

Code Block
/** Information about the job. */
public interface JobInformation {
    Collection<SlotSharingGroup> getSlotSharingGroups();

    VertexInformation getVertexInformation(JobVertexID jobVertexId);

    /** Information about a single vertex. */
    interface VertexInformation {
        JobVertexID getJobVertexID();

        int getParallelism();

        SlotSharingGroup getSlotSharingGroup();
    }
}




Code Block
/** Assignment of slots to execution vertices. */
public final class ParallelismAndResourceAssignments {
    private final Map<ExecutionVertexID, ? extends LogicalSlot> assignedSlots;

    private final Map<JobVertexID, Integer> parallelismPerJobVertex;
}


calculateDesiredSlots returns a ResourceCounter that describes the ideal amount of resources for the job.

...

assignResources assigns the available resources to the ExecutionVertices and reserves the slots.

FailureHandler

In order to handle failures, the declarative scheduler will support the same RestartBackoffTimeStrategy as used by the pipelined region scheduler. Hence all currently RestartBackoffTimeStrategies will be supported. The failure handling procedure is the following:

  1. Check whether the failure is recoverable. If not, then go to Failing state
  2. Ask the configured RestartBackoffTimeStrategy whether we can restart. If not, then go to Failing state
  3. Ask the configured RestartBackoffTimeStrategy for the backoff time between failure and restart

ScalingPolicy

Whenever the scheduler is in the Executing state and receives new slots

...

, the scheduler checks whether the job can be run with an increased parallelism. If this is the case, then the scheduler will ask the ScalingPolicy given the old and cumulative parallelism of all operators whether it should scale up or not.

Code Block
/**
 * Simple policy for controlling the scale up behavior of the {@link
 * org.apache.flink.runtime.scheduler.declarative.DeclarativeScheduler}.
 */
public interface ScalingPolicy {

    /**
     * This method gets called whenever new resources are available to the scheduler to scale up.
     *
     * @param currentCumulativeParallelism Cumulative parallelism of the currently running job
     *     graph.
     * @param newCumulativeParallelism Potential new cumulative parallelism with the additional
     *     resources.
     * @return true if the policy decided to scale up based on the provided information.
     */
    boolean canScaleUp(int currentCumulativeParallelism, int newCumulativeParallelism);
}

A basic default implementation will only scale up if newCumulativeParallelism - currentCumulativeParallelism >= increaseThreshold.

ExecutionFailureHandler

...

How to distinguish streaming jobs

...

The declarative scheduler will be a beta feature which the user has to activate explicitly by setting the config option jobmanager.scheduler: declarative. It will only be chosen if the user submitted a streaming job.

Limitations

  • Rescaling happens through restarting the job, thus jobs with large state might need a lot of resources and time to rescale. Rescaling a job causes downtime of your job, but no data loss.
  • The declarative scheduler runs with streaming jobs only
  • No support for local recovery
  • No support for local failovers
  • No integration with Flink's web UI
  • No support for fine grained resource specifications

Test Plan

The new scheduler needs extensive unit, IT and end-to-end testing because it is a crucial component which is at the heart of Flink.

...