Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion

...

  • CRD to express Flink application (for details see CRD section below)
    • External jar artifact fetcher support (s3, https etc.) via init container
    • creates an empty session cluster, no application/job management
    • the session cluster can be used to control jobs externally (like submission via REST API)
    • Supports all Flink configuration properties
    • Docker image
    • Upgrade policy (savepoint, stateless)
    • Restore policy (savepoint, latest externalized checkpoint, stateless)
    • jobmanager and taskmanager pod template (unrestricted k8s pod configuration)
    • Support explicit session cluster (no job management) and application mode
  • Create & deploy new Flink application
    • Empty state
    • From savepoint
  • Upgrade Flink application with or w/o savepoint on any CR change, including:
    • Flink configuration change
    • Job jar change
    • Docker image change
  • Pause/Resume Flink application
    • the job will not continue its data processing
    • the job will not be deleted from the cluster
    • the job will release its resources back to the cluster (can be used by other jobs)
    • Stops job with savepoint, tracks savepoint/last checkpoint in CR status for resume.
  • Delete Flink application
  • Integrate with Flink Kubernetes HA module [4]
    • When selected, operator can obtain latest checkpoint from config map and does not depend on a potentially unavailable Flink job REST API
    • This should the default, but not a hard dependency
  • Support Flink UI ingress
  • CI/CD with operator Docker image artifact, publish image in dockerhub

Flink Native vs Standalone integration

Flink currently supports two different approaches to running jobs on Kubernetes:

  1. Standalone mode
    1. supported by existing OSS operators
    2. operator only entity that creates k8s objects
    3. users can interact with k8s objects directly to manage job resources, scaling etc
    4. Flink processes don’t require access to API server to create pods
  2. Flink native (embedded) k8s integration [2]
    1. simple job management through Flink client without need to create k8s objects
    2. no support for reactive scaling, operator handles parallelism change
    3. users should not interact with k8s objects directly as those are managed by flink
    4. requires wider access privileges for Flink jobmanager process (creates taskmanager pods)

In the long run it might make sense to support both deployment modes in the operator, however initially we should focus the development effort on a single approach. Maybe start with support for [2] since we could reuse the code in a Java based implementation.

CRD

  • Give users full control over k8s pod template (no mapping/whitelisting)
  • Layering/merging of pod templates (operator itself could also apply cluster wide defaults)

  kind: FlinkDeployment
metadata:
  name: flink-wordcount-example
  namespace: flink-operator
  annotations:
  labels:
    environment: development
spec:
  image: example:latest
  flinkVersion: "1.14"
  flinkConfiguration:
    // everything for flink-conf.yaml
    state.savepoints.dir: file:///checkpoints/flink/savepoints
  podTemplate:
    // everything that a k8s pod template supports
    // defaults for both, job and task manager pods
  jobManager:
    resources:
      requests:
        memory: "200Mi"
        cpu: "0.1"
    replicas: 1
    podTemplate:
      // everything that a k8s pod template supports
      // layered over common template
  taskManager:
    taskSlots: 2
    resources:
      requests:
        memory: "200Mi"
        cpu: "0.1"
    podTemplate:
      // everything that a k8s pod template supports,
      // layered over common template
  // job can be optional for plain session cluster
  job:    
    jarURI: "file:///wordcount-operator-example-1.0.0-SNAPSHOT.jar"
    parallelism: 3
    entryClass: "org.apache.flink.WordCount"
    args: ""
    cancelMode: (savepoint, none)
    restoreMode: (savepoint, checkpoint, none)
  logging:
    // customize logging config in flink container   
  status:
    ...
    // information about the actual state
    // including latest savepoint/checkpoint etc.   


Compatibility, Deprecation, and Migration Plan

...