Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Status

...

Page properties

...


Discussion thread

Discussion thread: - 

JIRA: -

Released: -

...


Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-4319

Release1.5



This Flink Improvement Proposal outlines improvements in how Flink is used on top of various cluster managers (YARN, Mesos) and deployment technologies (Docker, Kubernetes, etc).

For simplicity, we refer to all those cluster- and deployment frameworks as cluster managers.

 



Original design draft document: https://docs.google.com/document/d/1zwBP3LKnJ0LI1R4-9hLXBVjOXAkQS5jKrGYYoPz-xRk

(may still be used for copying and adapting figures)


...

Motivation

This FLIP addresses a series of shortcomings in the current integration with YARN and container managers (Docker/Kubernetes) and future operations with Mesos.

Shortcomings to address

  • Resource allocation is currently static. A job needs to grab all required resources up front and maintain that static set of resources throughout the execution. The improvement will allow for dynamic resource acquisition and dynamic release of resources on YARN and Mesos for better resource utilization and dynamic scaling

...

  • The concept of “sessions” (clusters that accept multiple jobs) with “per job clusters” is intermingled and hard to understand. This proposal separates them more clearly


Core Changes


Single Job JobManager

The most important change is that the JobManager handles only a single job. The JobManager will be created with a JobGraph and will be destroyed after the job execution is finished. This model more naturally maps what happens with most jobs anyways.

...

The JobManager constructor will also optionally take a Savepoint or Checkpoint to initialize the job from.

ResourceManager

The ResourceManager (introduced in Flink 1.1) is the cluster-manager-specific component. There is a generic base class, and specific implementations for:

...

In the case where the ResourceManager already has slots available from registered TaskManagers, steps (2) and (3) are skipped.


TaskManager

TaskManagers are both in contact with the ResourceManager and JobManager:

...

  • The TaskManager offers a slot to a JobManager at the ResourceManager’s behest. That slot is then tied to that JobManager until the JobManager releases the slot.

  • The TaskManager watches all JobManagers to which it has offered slots. Loss of connection to the JobManager results in triggering master-failure recovery (currently: cancel all tasks form that master)

  • JobManagers can deploy tasks only into slots they allocated.

  • Upon loss of connection to the JobManager, the TaskManager will try to re-register the slots at the new JobManager for that job (retrieved via the HA leader lookup). After a moderate timeout period, it releases the slots and makes them available again. If a backup JobManager does not take over within that period, it will have to re-request the slots from the ResourceManager.


JobManager Slot Pool

The JobManager has a SlotPool which holds the slots that were offered to it and accepted. The JobManager’s scheduler grabs slots from the SlotPool and can thus access all currently registered slots even if the ResourceManager is down.

...

The SlotPool releases slots that are unused to the ResourceManager. Slots count as unused if they are not used when the job is fully running (fully recovered).

Dispatcher

The new design includes the concept of a Dispatcher. The dispatcher accepts job submissions from clients and starts the jobs on their behalf on a cluster manager.

...

  • The dispatcher is a cross-job service that can run a long-lived web dashboard

  • Future versions of the dispatcher should receive only HTTP calls and thus can act as a bridge in firewalled clusters

  • The dispatcher never executes code and can thus be viewed as a trusted process. It can run with higher privileges (superuser credentials) and spawn jobs on behalf of other users (acquiring their authentication tokens). Building on that, the dispatcher can manage user authentications

Fault Tolerance

The core recovery mechanism is still task restart and state restore from checkpoint.

...

  • Detection and restart of the process(es) that execute the JobManager and ResourceManager

  • Recovery of the job’s JobGraph and libraries


...

Architecture with Cluster Managers

 YARN

Compared to the state in Flink 1.1, the new Flink-on-YARN architecture offers the following benefits:

  • The client directly starts the Job in YARN, rather than bootstrapping a cluster and after that submitting the job to that cluster. The client can hence disconnect immediately after the job was submitted

  • All user code libraries and config files are directly in the Application Classpath, rather than in the dynamic user code class loader

  • Containers are requested as needed and will be released when not used any more

  • The “as needed” allocation of containers allows for different profiles of containers (CPU / memory) to be used for different operators


Without Dispatcher


With Dispatcher


Yarn-specific Fault Tolerance Aspects

ResourceManager and JobManager run inside the ApplicationMaster process. Failure detection and restart of that process is done by YARN.

JobGraph and libraries are always part of the working directory from which the ApplicationMaster processes is spawned. Internally, YARN stores them in a private HDFS directory.

Mesos

Mesos based setups are similar to YARN with a dispatcher. A dispatcher is strictly required for Mesos, because it is the only way to have the Mesos-specific ResourceManager run inside the Mesos cluster.


Mesos-specific Fault Tolerance Aspects

ResourceManager and JobManager run inside a regular Mesos container. The Dispatcher is responsible for monitoring and restarting those containers in case they fail. The Dispatcher itself must be made highly available by a Mesos service like Marathon, as in this picture:

...

JobGraph and libraries need to be stored by the dispatcher in a persistent storage, typically the same storage where the checkpoints are stored.

 

Standalone

 

The Standalone Setup is should keep compatibility with current Standalone Setups.

...

For highly-available setups, there are multiple dispatcher processes, competing for being leader, similar as the currently the JobManagers do.




Standalone-specific Fault Tolerance Aspects

By default, there is no mechanism to restart failed processes. This has to be solved by external tools, or by the availability of sufficient standby machines (TaskManagers and Dispatchers).

For high-availability, dispatchers must store the JobGraphs and libraries of submitted jobs in a persistent storage, like the checkpoint storage.

Standalone Setup v2.0

A future version of the Standalone Setup could be thought of to implement something like a “lightweight Yarn” architecture:

  • All nodes run a simple “NodeManager” process that spawns processes for the TaskManagers and JobManagers, that way offering proper isolation of jobs against each other.

  • The LocalDispatcher will not spawn the JobManager internally but on a lightweight “node manager”


 

Docker / Kubernetes

Users define Flink clusters purely based on Dockerized Jobs and TaskManagers. There exists no client and no job submission step is involved.

...

This case uses a ResourceManager that tells each registering TaskManager to give its slots to the JobManager immediately. That way, the JobManager always has all available slots in the cluster at its disposal (without checking and requesting) and can trigger scaleout to the full set of resources.

Docker/Kubernetes-specific Fault Tolerance Aspects

 

ResourceManager and JobManager run inside the master process. Failure detection and restart of that process is done by container orchestrator (Kubernetes).

Libraries are always part of the master container. The JobGraph can be recovered by re-executing the program (may yield problems on non deterministic programs) or by storing it in the checkpoint storage.

 

 

...

Sessions

The design here  should cover all existing functionality, except the “Yarn Session” mode. The Yarn Session mode currently behaves like a standalone cluster bootstrapped on YARN.

The core functionality of a Yarn Session is to have a set of machines that are already allocated and thus can accept a series of short jobs.


 

...


 

Component Design and Details

 

Resource Allocation Details (JobManager, ResourceManager, TaskManager)

There are various identifiers used to describe what component and status a message is associated with:

  • ResourceID - The unique identifier of a TaskManager (container)

  • RmLeaderID - Fencing token for the specific instance of granted leadership to a ResourceManager. Disambiguates between messages over the course of loss and re-gain of leadership.

  • JobID - Unique ID of a job over its lifetime

  • JmLeaderID - Unique ID of a JobManager, a fencing token like the current LeaderSessionID. Changes each time a new JobManager takes a job over. Disambiguates between messages sent by different JobManagers over time, to prevent that old stale JobManagers (who still think they are in charge) send interfering messages.

  • AllocationID - Slot allocation identifier, created by the JobManager when requesting a slot, constant across re-tries. Used to identify responses by the ResourceManager and to identify deployment calls towards the TaskManager that was allocated from.

  • Profile - The resource profile of the desired slot (CPU cores, Memory, Disk, …)

 

Slot Allocation with Reequesting a New TaskManager

 


Message loss handling:

 

  • Loss of messages (4) or (6) will be handled by re-try of message (4). ResourceManager will not request a duplicate instance, as there is already a slot allocated to that AllocationID, or a pending request for that AllocationID. If the slot was already released before the retry, a redundant container may be brought up and will be eventually released as unused.

  • Loss of message (10) will be handled by TaskManager registration retries. ResourceID helps deduplicate repeated registrations.

  • Loss of message (12) by timeout and retry. Duplication through retry leads to refused registration at JobManager (AllocationID collision), or to registration and release as unused.

  • Loss of message (13) compensated by heartbeat slot reports

  • Loss of message (14) by registration retry loop, disambiguated by combination of (AllocationID, ResourceID)


Slot Allocation from cached TaskManager

 


Loss of messages handled analog to above.

...

  • The ResourceManager should always mark a slot as occupied before it is actually occupied at the TaskManager, thereby never trying to allocate an already used slot. The exception is an incorrect timeout and release on the ResourceManager side, which will be caught by the TaskManager rejecting a slot allocation request for an already occupied slot.

 

Failure Handling

 

TaskManager Failure

ResourceManager: detection and reaction

...

  • Restarted TaskManager process will lookup (leader) ResourceManager and register and offer its slots

ResourceManager Failure

TaskManager: detection and reaction

...

It may happen that the ResourceManager launches a container for a slot allocation request, fails, and after recovery (the JobManager re-requests the outstanding slots), starts another container for that slot request. The second container will be released as “unused” after the idle time.

JobManager Failure

TaskManager: detection and reaction

...

  • Acquired leader status

  • Registers at ResourceManager (to receive notifications about TaskManager failures)

  • Triggers execution of its job (execution graph) from latest completed checkpoint

Concurrent JobManager & ResourceManager Failure

  • TaskManagers handle regular JobManager failure

  • TaskManagers will try to offer slots to a new JobManager for a certain period of time

  • TaskManagers will be in registration loop for ResourceManager

Concurrent TaskManager & ResourceManager Failure

  • JobManager misses failure notifications for TaskManager container from the cluster manager, but detects TaskManager failure via its own heartbeats

  • Pending allocation request from the JobManager time out (or get canceled upon noticing the loss of the ResourceManager), new allocation requests will happen when ResourceManager is coming back

  • JobManager may need to scale down to compensate for lost slots


 

...

Public Interfaces

The changes here do not affect any program API

...

The biggest user-facing changes are the YARN CLI experience. While this FLIP needs not necessarily change the parameters of the CLI, it will change the responses. For example, jobs will be accepted without first waiting for all TaskManagers to be allocated and available.

 


...

Compatibility, Deprecation, and Migration Plan

The standalone mode is still emulated/reproduced in a similar way as it exists prior to this FLIP.

...

The YARN session mode can still work as a "standalone mode bootstrapped on YARN". 


...

Test Plan

  • The ResourceManager, JobManager, TaskManager, and SlotPool need strong unit tests for the protocol.
  • The YARN and Mesos integration have their regular tests, as they have prior to this proposal

...


...

Rejected Alternatives

(none yet)