Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Status

Current state: Under Discussion

...

Page properties


...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-25963

...

Releasekubernetes-operator-1.0


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  • Custom Resource to express Flink application in Kubernetes native way (for details see CR example section below)
    • External jar artifact fetcher support (s3, https etc.) via init container
    • similar to https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#pod-template
    • the session cluster can be used to control jobs externally (like submission via REST API)
    • Access to native Flink properties and native Kubernetes pod settings
      • Minimal shorthand (proxy) settings that operator translates to underlying native settings (memory, cpu)
      • shorthand settings override underlying settings
    • Supports all Flink configuration properties
    • Docker image
    • Upgrade policy (savepoint, stateless)Restore policy (savepoint, latest externalized checkpoint-state, stateless)
    • Pod template for jobmanager and taskmanager
    • External jar artifact fetcher support (s3, https etc.) via init container
    • Support explicit session cluster (no job management) and application mode
      • the session cluster can be used to control jobs externally (like submission via REST API)
  • Create & deploy new Flink application
    • Empty state
    • From savepoint
  • Upgrade Flink application with or w/o savepoint on any CR change, including:
    • Flink configuration change
    • Job jar change
    • Docker image change
  • Pause/Resume Flink application
    • the job will not continue its data processing
    • the job will not be deleted from the cluster
    • the job will release its resources back to the cluster (can be used by other jobs)
    • Stops job with savepoint, tracks savepoint/last checkpoint in CR status for resume.
  • Delete Flink application
  • Integrate with Flink Kubernetes HA module [4]
    • When selected, operator can obtain latest checkpoint from config map and does not depend on a potentially unavailable Flink job REST API
    • This should the default, but not a hard dependency
  • Support Flink UI ingress
  • CI/CD with operator Docker image artifact, publish image to dockerhub
  • Error handling
    • Retry based on exception classifiers
    • Propagation of job submission errors through k8s event and/or status

...

  kind: FlinkDeployment
metadata:
  name: flink-wordcount-example
  namespace: flink-operator
  annotations:
  labels:
    environment: development
spec:
  image: example:latest
  flinkVersion: "1.14"
  flinkConfiguration:
    // everything for flink-conf.yaml
    state.savepoints.dir: file:///checkpoints/flink/savepoints
  podTemplate:
    // everything that a k8s pod template supports
    // defaults for both, job and task manager pods
  jobManager:
    resources:
      requests:
        memory: "200Mi"
        cpu: "0.1"
    replicas: 1
    podTemplate:
      // everything that a k8s pod template supports
      // layered over common template
  taskManager:
    taskSlots: 2
    resources:
      requests:
        memory: "200Mi"
        cpu: "0.1"
    podTemplate:
      // everything that a k8s pod template supports,
      // layered over common template
  // job can be optional for plain session cluster
  job:    
    jarURI: "file:///wordcount-operator-example-1.0.0-SNAPSHOT.jar"
    parallelism: 3
    entryClass: "org.apache.flink.WordCount"
    args: ""
    upgradeMode: (stateless, savepoint)
    state: (running, suspended, error)
    initialSavepointPath: "s3://mysavepoint"
  logging:
    // customize logging config in flink container   
  status:
    ...
    // information about the actual state
    // including latest savepoint/checkpoint etc.   

...