Draft Author: bzhaojyathousandy,Yang Wang(Flink Kubernetes), William Wang (Volcano)

Status

Discussion threadhttps://lists.apache.org/thread/pf8dvbvqf845wh0x63z68jmhh4pvsbow
Vote threadhttps://lists.apache.org/thread/w1tpl5cwdrj7bc8qrlp96q25y9sv5gjk
JIRA

Unable to render Jira issues macro, execution error.

Release1.17

Demo code: https://github.com/bzhaoopenstack/flink/pull/3/files

Motivation

Currently, Flink has supported Kubernetes as a backend resource provider for a long time. And it has a well-known performance, but we still need to improve it at that point and fully match the ability of Kubernetes, such as the customized resource scheduler, that means users can modify the resource scheduling more to match their real requirement. 

The current development trend of Flink supports unified stream and batch processing. Therefore, more optimization methods are required for unified batch and stream scenarios. It is especially necessary to use the existing or extending ability of K8S to solve the performance and application problems during Flink in batch & stream processing. Such as job scheduling, resource queuing, Gang scheduling(Batch processing), etc. But the default scheduler of K8S only supports the basic Pod-based scheduling, so K8S leaves an opportunity to developers to enable customized K8S schedulers [1] to make the deployment closer to their business deployments.

Currently, there are several graduated schedulers, such as CNCF volcano. Users can add this kind of customized scheduler natively and help them run their jobs in Flink on K8S more easily and efficiently. Such as:

  • Support scheduling policies for high-performance workloads such as fair-share, topology, SLA, reservation, pre-emption, backfill and others
  • Support enhanced resource management such as dynamic resource sharing and advanced scheduling for heterogeneous hardware.eg. GPU, CPU numa-awareness.


Currently, there are more and more requirements from customers about how to control the Flink jobs like YARN. Such as a Flink cluster admin in a production environment, he/she wants to leverage the whole Flink Jobs in cluster based on the existing compute resources, ,verify the left resources could fit the following planned Flink jobs and execute the jobs according to the defined queue priorities. After all, the customers have stronger requirements to control the resource reservation.

Let’s take some simple examples to describe what we want to propose.

1 . Job based scheduling and resource reservation

Job based scheduling is very useful for multi-task applications which require scheduling a group of tasks simultaneously. This is a common requirement for data processing workload such as machine learning, Big Data and HPC.

In short, we propose to couple the JM and TM pods into a single group to perform job based scheduling and resource reservation for TM to avoid the deadlock issue that JM occupies all the resources and TM  has no chance to get any resource. The scheduler reserves resources for TM and won’t schedule overcommit pods if the idle can not fit all following TM pods setup. 


2. Dynamic resource sharing(Queue scheduling)

Queue is a common concept in the general batch system. It is used to share resources between different tenants. One tenant is mapped to a queue which has the min and max amount of resources and also can share resources with other queues proportionally. Also a queue can be enabled/disabled and shares resources with another queue.



3. Job based fair-share

When running several elastic jobs, e.g. Streaming, it's better to allocate resources to each job fairly to meet their SLA/QoS when multiple jobs are competing for additional resources. In the worst case, a single job could launch a large number of Pods with low resource utilization, preventing other jobs from running due to insufficient resources. To avoid overly small allocations (e.g. launch one Pod for each job), elastic jobs can leverage co-scheduling to define the minimal available number of Pods that should be started. Any Pods beyond the specified minimal available amount will share cluster resources with other jobs fairly.

4. Job priority scheduling

Job priority based scheduling is a common requirement. It helps ensure high priority jobs are bound before lower priority jobs and high priority jobs should also have a chance to preempt a whole job with a lower priority instead of several pods from different jobs.

Step 1: Create a queue with limited resource

Step 2: Create a pod to occupy resource

Step 3: Submit 3 jobs with different priorities

Step 4: Delete the occupy resource pod to release resource

Step 5: All jobs Succeed defers to priority order.


Proposed Changes

We will introduce the whole feature in 2 steps. The first step is making Flink support K8S customized scheduler mechanism. Then we will introduce a reference implementation “Volcano” in Flink. And in the following description, we will use “Volcano” to take an example.

Kubernetes Customized Scheduler Mechanism Proposal

1. Flink on Kubernetes with Kubernetes Customized Scheduler

Allow users to specify any customized scheduler to Flink pods or any pods setup by K8S cluster deployment.

K8S provides a field “schedulerName” in pod spec or deployment.

So for this step, we need to introduce an new configure settings when we setup Flink cluster on K8s, such as:

./flink run-application 

--target kubernetes-application 

-Dkubernetes.cluster-id=my-flink-application-cluster-test 

-Dkubernetes.jobmanager.service-account=flink-service-account 

-Dkubernetes.container.image=apache/flink:latest 

-Dkubernetes.jobmanager.scheduler-name=[scheduler name]

-Dkubernetes.taskmanager.scheduler-name=[scheduler name]

-Dkubernetes.scheduler-name=[scheduler name]

-Dkubernetes.jobmanager.cpu=0.1 -Dkubernetes.taskmanager.cpu=0.3 

local:///opt/flink/usrlib/WordCount.jar 

By setting the customized scheduler name in deployment and pod spec to control Flink JobManager,TaskManager and associated executor pods using the K8S scheduler specified by users.

Pod Spec proposal:

Key

Category

Related Config Options

Description

schedulerName

Defined by the user/Flink

kubernetes.scheduler-name

kubernetes.taskmanager.scheduler-name , kubernetes.jobmanager.scheduler-name

K8S Pod Scheduler Name

Notes: The scheduler-name of JobManager or TaskManger will be backwards to kubernetes.scheduler-name.

2.  Need another settings to specify the necessary information to the customized k8s scheduler for Flink JobManager and TaskManager Pods.

 

Job-scheduling Info

Description

Job

  ID

Organize a group of pods into a single job, such as PodGroup in Volcano.

Job

  Priority

a batch scheduler needs to understand the job priority and then apply different scheduling strategy to different priority jobs

Job

Queue

Which resource queue this job will be submitted to. Resource queue is different from the namespace. Most batch schedulers define their own form of resource queues, which addresses the resource sharing model.

Job

  min number

The minimum number of pods or tasks running in a job. If the cluster resource cannot meet the demand of running the minimum number of pods or tasks, no pod or task in the job will be scheduled.

Job

  min resource

The minimum resources for running the job in Batch scenarios. If available resources in the cluster cannot satisfy the requirement, no pod or task in the job will be scheduled.

This kind of necessary info can be used by the customized scheduler such as Volcano and others which follow the K8S PodGroup scheduling concepts for batch jobs. So that means we need to specify the necessary info also while we are setting up the Flink cluster. Parts of above information are provided in K8S resource annotations or K8S CRD. In current Flink K8S design, Flink already allows users to pass the customized annotations, so that they match our proposal already.

But those are the basic functions about the new proposal. We still need to analyze the current cluster deployment and the pod group use cases.    

-Dkubernetes.scheduler.<scheduler-name>.config=priorityclass:hight-priority,X:Y…

The option value is a map, and will be loaded by the specified customized scheduler.

Additional K8S customized configuration proposal, such as PodGroup in Volcano:

Key

Category

Related Config Options

Description

config

Defined by user

kubernetes.scheduler.<scheduler-name>.config

The customized Pod Group definition for JobManager and TaskManager. Such as:

mincpu:1, minmember:2, minmemory:1000Mi, priorityclass: <CLASSNAME>, the customized scheduler will get the message it is concerned, ignoring others. 


3. Need the ability of preparing K8S resource before JobManager creation

For applying the customized schedulers functionality or making Flink K8S more flexible, we need to make Flink to provide an ability to prepare the K8S resource before we actually set up the Flink components.

In current Flink K8S design, it provides buildAccompanyingKubernetesResources interface to add the resources from decorators, the mentioned AccompanyingKubernetesResources will be created after the JobManager Deployment creation. But some K8S Pod-related customized schedulers might need an extension K8S resource to control the real scheduling before the target Pods setup. On the other hand, we only make an example about the customized scheduler which only worked before the Pod creation, we can not imagine all customized schedulers works like that, so we can make the existing JobManager deployment create processing more common.

New interface should be provided, and the decorators can create the K8S resource for associate them with the following Flink Pods, then using buildAccompanyingKubernetesResources to refresh the owner reference for keeping the same behavior like other AccompanyingKubernetesResources.

public interface KubernetesStepDecorator {

    FlinkPod decorateFlinkPod(FlinkPod flinkPod);

    List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException;

    List<HasMetadata> [Pre K8S resource creation Interface]();

}

4. Upgrade Fabric8 Client to the 5.12.2

Flink K8S component uses Fabric8 to manage the K8S resources, so to apply the new functionality from K8S and its new customized schedulers, we need to upgrade the existing K8S Fabric8 version to v5.12.2 at least.

Volcano extension available since k8s-client v5.11, and become perfected till v5.12.2

https://github.com/fabric8io/kubernetes-client/pull/3580

Keep the same user experience with K8S Java SDK.

5. Support pluginable decorators mechanism

For making it more common for the Flink K8S Decorators, we want to introduce a pluginable decorators mechanism into Flink JobManager and TaskManager. Currently, all decorators are hard-coded in the JobManager deployment and TaskManager pod K8S creation.

We propose using SPI or Flink plugins mechanism to load the external decorators. In this way, we can make Flink more flexible towards supporting the decorators of customized K8S schedulers.

We propose a new plugin mechanism for supporting load the jar package of a single customized scheduler. The driver jar package should contain the specific K8sStepDecorator and its dependencies. Taking an example, the jar package should be packaged into the ‘opt’ directory, and contains 2 major things:

  1. A K8sStepDecorator implemented by Customized K8S SchedulerA.
  2. The all dependencies from the introduced K8sStepDecorator.

when users want to use the customized scheduler A, he/she need to create a new directory which named as the customized scheduler’s name A, then copy & paste the said jar into the new directory, that could be loaded when user specific the related Flink K8S configuration options and enable the functionality in Flink.



Volcano Reference Implementation Proposal

Below will be associated with Volcano and only introduce a reference implementation in Flink.

Overview about proposal(Volcano Demo implementation)

The implementation of Volcano, we plan to introduce a new decorator for Volcano. But before that, we need to introduce its dependency first. The desired Use way is, when Flink users/adminstors download the Flink release packages, Volcano decorator and its dependency will be packaged as a separate jar in Flink opt directory. If they want to use the functionality we proposed on K8S, the steps need to be done as following. they need to copy&paste the said Volcano jar package into the Flink jar packages loading path.

  1. Setup the Kubernetes Volcano cluster on the existing Kubernetes cluster. For example, 

kubectl apply -f https://raw.githubusercontent.com/volcano-sh/volcano/master/installer/volcano-development.yaml

       2. Create a new directory into plugins directory with a good name.

mkdir -p $FLINK_HOME/plugins/volcano

       3. Copy the customized scheduler decorator plugin into plugins/ directory.

cp $FLINK_HOME/opt/flink-kubernetes-volcano.jar $FLINK_HOME/plugins/volcano

       4. Submit the flink job with the specified volcano scheduler name and its specific option, such as

-Dkubernetes.scheduler.volcano.config

-Dkubernetes.jobmanager.scheduler-name=[scheduler name]

-Dkubernetes.taskmanager.scheduler-name=[scheduler name]

-Dkubernetes.scheduler-name=[scheduler name]

Then the Flink Kubernetes will load the plugin automaticlly, and setting the volcano related configuration according your inputs.


Introduce Volcano model dependency and Volcano StepDecorator

For supporting operating PodGroup, we propose io.fabric8.volcano-model-v1beta1[2] dependency inorder to couple PodGroup with other AccompanyingKubernetesResources to Fabric8 client.

Once we introduce the said dependency, Flink code could operate the PodGroup as same as the K8S native resources.

import io.fabric8.volcano.client.VolcanoClient;

import io.fabric8.volcano.scheduling.v1beta1.PodGroup;

import io.fabric8.volcano.scheduling.v1beta1.PodGroupSpec;

// Adapt volcano client from defaultKubernetesClient

DefaultKubernetesClient defaultKubernetesClient = new DefaultKubernetesClient(new ConfigBuilder().withMasterUrl("http://10.182.82.160:8001").withConnectionTimeout(100000).withRequestTimeout(100000).build());

VolcanoClient vcclient = defaultKubernetesClient.adapt(VolcanoClient.class);


// Manage extension resource like native K8S resources

ObjectMeta meta = new ObjectMeta();

meta.setName("test-pg");

meta.setNamespace("default");

PodGroup pgg = new PodGroup();

pgg.setMetadata(meta);

PodGroupSpec pgs = new PodGroupSpec();

pgs.setQueue("test");

pgg.setSpec(pgs);

defaultKubernetesClient.resources(PodGroup.class).inNamespace("default").createOrReplace(pgg);

Before we explain the details of volcano StepDecorator, we’d better go through what we need to introduce.

Below is the PodGroup definition, the PodGroup should be prepared before the JobManager deployment. What we need to be concerned are:

PodGroup.metadata: It defines the name and owner reference of PodGroup. In Particular, we will use the PodGroup name to associate the following K8S resource, such as TaskManager Pod.

PodGroup.spec: It defines the PodGroup attrs, such as:

  • PodGroup.spec.queue: indicates the queue to which the PodGroup belongs
  • PodGroup.spec.minAvailable: indicates the minimum number of pods or tasks running under the PodGroup
  •  PodGroup.spec.minResources: indicates the minimum resources for running the PodGroup. 


# PodGroup (prepare before deployment creation)

apiVersion: scheduling.volcano.sh/v1beta1

kind: PodGroup

metadata:

  name: podgroup-{job-clusterid}

   ownerReferences: 

   - apiVersion: apps/v1

     blockOwnerDeletion: true

     controller: true

     kind: Deployment

     name: {job-clusterid}

spec:

  queue: default

  minAvailable: 1

  minResources:

    cpu: "3"

    memory: "2048Mi"

  priorityClassName: high-priority

Below is the TaskManager associated PodGroup.

# Pod (TaskManager)

apiVersion: v1

kind: Pod

annotation:

    scheduling.k8s.io/group-name: podgroup-{job-clusterid}


Volcano Design description and Analysis for Flink

The major idea is associating the PodGroup with JobManager and its related TaskManagers, it is a JobManager based PodGroup scheduling. We will analyze the 2 common deployment types in K8S cases with the enabled Volcano customized scheduler.

a. Application mode deployment

The first one, application mode, which is the simplest one, the whole cluster will only run a single Flink job, we can consider that the new K8S application cluster needs resource isolation . For a single Flink application running, we can set a Volcano Queue and a single PodGroup, then the Volcano will consider the whole resource reservation currently and check whether the idle resources match the whole resource requests in this single Flink app request. If yes, it will set up all the pods and deploy them. If not, it won’t create any TaskManager pods in the environment, once the idle resource can fit the whole resource requirement, then it will re-consider whether it can fit. So that’s true, it brings flexibility to Kubernetes Flink users.

The PodGroup and Queue associated with the existing Deployment and Pod are very simple, but we need to consider the life cycle of PodGroup/Queue. 

Queue: We keep the same behavior like YARN Queue in Flink, we only access the user input for the Queue Name, and users need to configure the Queue in K8S first, then specify the Queue Name into PodGroup during job submission.

PodGroup: Flink needs to maintain the whole lifecycle when users only specify the customized Pod Scheduler Name to JobManager and TaskManager. And users won’t be awared the said PodGroup. For example, if users specify “volcano”, Flink needs to create the PodGroup automatically.

b. Life cycle of PodGroup in application deployment:

Creation: Make the PodGroup creation as an additional resource when creating JobManager deployment or TaskManager Pod. And the name should be based on the K8S Flink application cluster-id, which is heavily related with the Flink Application.

Updation: Won’t support.

Deletion: When creating the said PodGroup, we need to update its OwnerReference to the said cluster-id deployment, so it can be cleaned by the K8S recycling mechanism. Flink won’t delete the PodGroup which are not created by Flink.

c. Session mode deployment

Flink session mode works differently with application mode, as Flink jobs share the JobManager and TaskManager Pods in Flink session cluster. And there is still a scenario that the whole K8S cluster will set up so many Flink session clusters at the same time, and willing to post Flink jobs on them. In this case, there still is a risk of resource deadlock. So we need to keep the JobManager and TaskManagers into a single group for scheduling, which means the JobManager and all TaskMangers under it should be associated with the session cluster based PodGroup.

d. Life cycle of PodGroup in session deployment:

Creation: Make the PodGroup creation as an additional resource when creating JobManager deployment or TaskManager Pod. And the name should be based on the K8S Flink session cluster-id.

Updation: Won’t support.

Deletion: When creating the said PodGroup, we need to update its OwnerReference to the said cluster-id deployment, so it can be cleaned by the K8S recycling mechanism.

e. Other cases:

This proposal only supports the customized schedulers which support the PodGroup and Queue concepts. If users don't specify the scheduler, the pod/deployment of JobManager and TaskManager will use the default K8S scheduler as default. Also if Flink k8s POD spec constructor notices that the scheduler in pod spec is not default scheduler and does not support the PodGroup, the final pod spec will still use the scheduler which in pod spec.

All of aboves are the major ideas of the whole design.

Backend Knowledge Summarize

  Volcano  –

Volcano is a Kubernetes native batch system for big data and high-performance workloads. This has now been accepted by the Cloud Native Computing Foundation (CNCF) as its first and only official container batch scheduling project. There was also a volcano integration experience with Spark, which is Spark K8S operator.

Volcano is set up at Kubernetes cluster level, when a user submitting a Flink application, the user needs to tell Kubernetes the job requirement like queue/priority/min resource.

Concept

- PodGroup

The PodGroup concept was firstly approved by the Kubernetes community in KEP-583 Coscheduling. PodGroup is a group of pods with strong association and is mainly used in batch scheduling.

- Queue

In Volcano, Queue is a collection of PodGroups, which adopts FIFO. It is also used as the basis for resource division.

Coupled with PodGroup and Queue brings more possibilities to use Pods-based deployment.

    • Leverage all jobs setup ordered by the Priority, so it supports resource preemption.
    • Make all jobs like a FIFO queue, the first part of Jobs running and the next part of Jobs waiting without resource reservation.

  

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • No effect for existing users, users could add the said configuration to their K8S cluster manager, then use this.

Implementation Plan

  1. Add the ability to prepare the resources before JobManager or TaskManger Pods setup.
  2. Add support for cross-pass JobID to ResourceDriver.
  3. Add decorator and Volcano support in the new CustomizedScheduler interface.
  4. Reconfigure the KubernetesPod via the new decorator.
  5. Support PodGroup cleanup progress in Flink Runtime and Flink Kubernetes.

Test Plan

  • Integration test: Using minikube as a e2e test, integrate with the said customized K8S schedulers, deploy them in minikube, then execute tests on minikube.
  • Unit tests: Each function and logic change should introduce a new Unit test. 

Rejected Alternatives

None