Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion thread

...

...

Vote thread

...

...

xxfpg6sgdc223nj90fsx16b8f89g8kzy 
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-25963

Releasekubernetes-operator-1.0


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Kubernetes (k8s) has become the predominant deployment platform for Flink. Over the past several years, a number of integrations have been developed that aim to help operationalize mission-critical Flink applications on k8s. Given the increasing adoption of k8s for Flink deployments and as discussed in [1], there is interest to provide a k8s native solution as part of Flink that can benefit from the rich experience of community members and ultimately make Flink easier to adopt.

Currently Flink comes with built-in embedded k8s support, also referred to as Flink native k8s [2, 3, 4], offering an alternative to the traditional standalone deployment mode. Independently a number of k8s operators have been developed outside of Apache Flink, including [5, 6]. These implementations are not under the umbrella of a neutral entity like the ASF and as a result, tend to lack wider community participation and projects go stale after the maintainers shift focus.

However, the custom resource and operator concepts are central to a Kubernetes native deployment experience. It allows to manage Flink applications and their lifecycle through k8s tooling like kubectl. A key feature of an operator is the automation of application upgrades, which cannot be achieved through the "Flink native" integration alone.

Public Interfaces

N/AThe public interface is essentially the custom resource descriptor (CRD), see below.

Proposed Changes

We are proposing to provide an Flink k8s operator implementation as part of Flink that is maintained by the community and closely integrated with the Flink ecosystem. This implementation will benefit from the extensive experience of Flink community members with large scale mission critical Flink deployments and learnings from existing operator implementations. As part of Flink, the operator will have a better chance to follow the development of Flink core, influence changes to Flink core and benefit from the established collaboration processes of the project. We are proposing a dedicated github repository for the operator with operator versioning and releases separate from core Flink (like flink-statefun or flink-shaded). 

Initial Feature Set

For the initial version of the operator we aim to target core aspects of job lifecycle management.

  • Custom Resource to express Flink application in Kubernetes native way (for details see CR example section below)
    • External jar artifact fetcher support (s3, https etc.) via init container
    • similar to https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#pod-template
    • the session cluster can be used to control jobs externally (like submission via REST API)
    • Access to native Flink properties and native Kubernetes pod settings
      • Minimal shorthand (proxy) settings that operator translates to underlying native settings (memory, cpu)
      • shorthand settings override underlying settings
    • Supports all Flink configuration properties
    • Docker image
    • Upgrade policy (savepoint, stateless)Restore policy (savepoint, latest externalized checkpoint-state, stateless)
    • Pod template for jobmanager and taskmanager
    • External jar artifact fetcher support (s3, https etc.) via init container
    • Support explicit session cluster (no job management) and application mode
      • the session cluster can be used to control jobs externally (like submission via REST API)
  • Create & deploy new Flink application
    • Empty state
    • From savepoint
  • Upgrade Flink application with or w/o savepoint on any CR change, including:
    • Flink configuration change
    • Job jar change
    • Docker image change
  • Pause/Resume Flink application
    • the job will not continue its data processing
    • the job will not be deleted from the cluster
    • the job will release its resources back to the cluster (can be used by other jobs)
    • Stops job with savepoint, tracks savepoint/last checkpoint in CR status for resume.
  • Delete Flink application
  • Integrate with Flink Kubernetes HA module [4]
    • When selected, operator can obtain latest checkpoint from config map and does not depend on a potentially unavailable Flink job REST API
    • This should the default, but not a hard dependency
  • Support Flink UI ingress
  • CI/CD with operator Docker image artifact, publish image in to dockerhub
  • Error handling
    • Retry based on exception classifiers
    • Propagation of job submission errors through k8s event and/or status

Flink Native vs Standalone integration

...

  kind: FlinkDeployment
metadata:
  name: flink-wordcount-example
  namespace: flink-operator
  annotations:
  labels:
    environment: development
spec:
  image: example:latest
  flinkVersion: "1.14"
  flinkConfiguration:
    // everything for flink-conf.yaml
    state.savepoints.dir: file:///checkpoints/flink/savepoints
  podTemplate:
    // everything that a k8s pod template supports
    // defaults for both, job and task manager pods
  jobManager:
    resources:
      requests:
        memory: "200Mi"
        cpu: "0.1"
    replicas: 1
    podTemplate:
      // everything that a k8s pod template supports
      // layered over common template
  taskManager:
    taskSlots: 2
    resources:
      requests:
        memory: "200Mi"
        cpu: "0.1"
    podTemplate:
      // everything that a k8s pod template supports,
      // layered over common template
  // job can be optional for plain session cluster
  job:    
    jarURI: "file:///wordcount-operator-example-1.0.0-SNAPSHOT.jar"
    parallelism: 3
    entryClass: "org.apache.flink.WordCount"
    args: ""
    cancelModeupgradeMode: (stateless, savepoint, none)
    restoreModestate: (savepointrunning, checkpointsuspended, none)error)
    initialSavepointPath: "s3://mysavepoint"
  logging:
    // customize logging config in flink container   
  status:
    ...
    // information about the actual state
    // including latest savepoint/checkpoint etc.   

...