Status

Discussion thread
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The JobManager and TaskManagers of the Flink cluster currently deployed in the standalone[1] mode are both deployed through the Deployment of Kubernetes with flink-kubernetes-operator.

However, Deployment cannot maintain the state of the component of JobManager and TaskManagers. And the Deployment does not support this at all in scenarios where PVC needs to be mounted separately for each TaskManager.

Using  StatefulSet instead of Deployment to deploy JobManager and TaskManagers can automatically mount PVC for each pod of JobManager and TaskManagers, and can maintain the relationship between PVC and each pod[2].

Public Interfaces

The public interface is the FlinkDeployment custom resource descriptor (CRD), see below.

Proposed Changes

FlinkDeployment CRD

CR example with volumeClaimTemplate
kind: FlinkDeployment
metadata:
  namespace: default
  name: basic-example
spec:
  image: flink:1.14.3
  flinkVersion: v1_14
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  serviceAccount: flink
  jobManager:
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
    volumeClaimTemplates: // (only needed for standalone clusters)
      - metadata:
          name: log
        spec:
          accessModes: [ "ReadWriteOnce" ]
          storageClassName: "lvm"
          resources:
            requests:
              storage: 10Gi
    podTemplate:
      apiVersion: v1
      kind: Pod
      metadata:
        name: job-manager-pod-template
      spec:
        containers:
          - name: flink-main-container
            volumeMounts:
              - name: log
                mountPath: /opt/flink/log
  taskManager:
    replicas: 4 // (only needed for standalone clusters)*     
    resource:
      memory: "2048m"
      cpu: 1
    volumeClaimTemplates: // (only needed for standalone clusters)
      - metadata:
          name: log
        spec:
          accessModes: [ "ReadWriteOnce" ]
          storageClassName: "lvm"
          resources:
            requests:
              storage: 10Gi
    podTemplate:
      apiVersion: v1
      kind: Pod
      metadata:
        name: task-manager-pod-template
      spec:
        containers:
          - name: flink-main-container
            volumeMounts:
              - name: log
                mountPath: /opt/flink/log
  mode: standalone 

We propose add volumeClaimTemplate to the JobManagerSpec and TaskManagerSpec to support Dynamic-created PVC mount.

JobManagerSpec class modified:

JobManagerSpec.java
public class JobManagerSpec {
    /** Resource specification for the JobManager pods. */
    private Resource resource;

    /** Number of JobManager replicas. Must be 1 for non-HA deployments. */
    private int replicas = 1;

    /**
     * Volume Claim Templates for JobManager StatefulSet, it will be used to mount custom PVCs just
     * for standalone mode.
     */
    private List<PersistentVolumeClaim> volumeClaimTemplates = new ArrayList<>();

    /** JobManager pod template. It will be merged with FlinkDeploymentSpec.podTemplate. */
    private Pod podTemplate;
}

TaskManagerSpec class modified:

TaskManagerSpec.java
public class TaskManagerSpec {
    /** Resource specification for the TaskManager pods. */
    private Resource resource;

    /** Number of TaskManager replicas. If defined, takes precedence over parallelism */
    @SpecReplicas private Integer replicas;

    /**
     * Volume Claim Templates for TaskManager StatefulSet, it will be used to mount custom PVCs just
     * for standalone mode.
     */
    private List<PersistentVolumeClaim> volumeClaimTemplates = new ArrayList<>();

    /** TaskManager pod template. It will be merged with FlinkDeploymentSpec.podTemplate. */
    private Pod podTemplate;
}

StandaloneFlinkService

To support Dynamic-created PVC mounting, within the operator, we deploy Flink JM and TM using StatefulSet instead of Deployment to maintain a one-to-one correspondence between PVC and pod.

The previous logic for creating and deleting clusters that involved Deployment resource operations has all been changed to operating StatefulSet resource by fabric8 kubernetes client.

Compatibility, Deprecation, and Migration Plan

The CRD volumeClaimTemplate can be null to maintain compatibility with the released 1.1.0 and before version.

Test Plan

We can test the creation of the dynamic PVC by creating a Flink standalone cluster in a real k8s clusters, And kill one TaskManager pod and wait for it recovered and mount previous existed PVC successfully.

test example
###check all pvcs dynamic-created by cr.
kubectl get pvcs
###delete one taskmanager pod.
kubectl delete pod {pod_name}

Delete the CR, all created PVCs will be retained, those can be deleted manually and permanently.

Rejected Alternatives

Using ReadWriteMany PVC for all pods of TM with current native or standalone mode.

Or using other operator like flink-on-k8s-operator to mount one-to-one PVC for each TMs.

References