You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

Status

Current state: Under Discussion

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kubernetes (k8s) has become the predominant deployment platform for Flink. Over the past several years, a number of integrations have been developed that aim to help operationalise mission-critical Flink applications on k8s.

Flink comes with built-in k8s support, also referred to as Flink native k8s [2, 3, 4], providing an alternative to the traditional standalone deployment mode. Independently a number of k8s operators have been developed outside of Apache Flink, including [5, 6]. These implementations are not under the umbrella of a neutral entity like the ASF and as a result, tend to lack wider community participation and projects go stale after the maintainers shift focus.

However, the operator concept is central to a Kubernetes native deployment experience. It allows to manage Flink applications and their lifecycle through k8s tooling like kubectl. A key feature of an operator is the automation of application upgrades, which cannot be achieved through the Flink native (embedded) integration alone.

Public Interfaces

N/A

Proposed Changes

We are proposing to provide an Flink k8s operator implementation as part of Flink that is maintained by the community and closely integrated with the Flink ecosystem. This implementation will benefit from the extensive experience of Flink community members with large scale mission critical Flink deployments and learnings from existing operator implementations. As part of Flink, the operator will have a better chance to follow the development of Flink core, influence changes to Flink core and benefit from the established collaboration processes of the project.

Initial Feature Set

For the initial version of the operator we aim to target core aspects of job lifecycle management.

  • CRD to express Flink application (for details see CR example section below)
    • External jar artifact fetcher support (s3, https etc.) via init container
    • creates an empty session cluster, no application/job management
    • the session cluster can be used to control jobs externally (like submission via REST API)
    • Supports all Flink configuration properties
    • Docker image
    • Upgrade policy (savepoint, stateless)
    • Restore policy (savepoint, latest externalized checkpoint, stateless)
    • Pod template for jobmanager and taskmanager
      • full control over k8s pod template (no mapping/whitelisting)
      • layering/merging of pod templates (operator itself could also apply cluster wide defaults)
    • Support explicit session cluster (no job management) and application mode
  • Create & deploy new Flink application
    • Empty state
    • From savepoint
  • Upgrade Flink application with or w/o savepoint on any CR change, including:
    • Flink configuration change
    • Job jar change
    • Docker image change
  • Pause/Resume Flink application
    • the job will not continue its data processing
    • the job will not be deleted from the cluster
    • the job will release its resources back to the cluster (can be used by other jobs)
    • Stops job with savepoint, tracks savepoint/last checkpoint in CR status for resume.
  • Delete Flink application
  • Integrate with Flink Kubernetes HA module [4]
    • When selected, operator can obtain latest checkpoint from config map and does not depend on a potentially unavailable Flink job REST API
    • This should the default, but not a hard dependency
  • Support Flink UI ingress
  • CI/CD with operator Docker image artifact, publish image in dockerhub

Flink Native vs Standalone integration

Flink currently supports two different approaches to running jobs on Kubernetes:

  1. Standalone mode
    1. supported by existing OSS operators
    2. operator only entity that creates k8s objects
    3. users can interact with k8s objects directly to manage job resources, scaling etc
    4. Flink processes don’t require access to API server to create pods
  2. Flink native (embedded) k8s integration [2]
    1. simple job management through Flink client without need to create k8s objects
    2. no support for reactive scaling, operator handles parallelism change
    3. users should not interact with k8s objects directly as those are managed by flink
    4. requires wider access privileges for Flink jobmanager process (creates taskmanager pods)

In the long run it might make sense to support both deployment modes in the operator, however initially we should focus the development effort on a single approach. Maybe start with support for [2] since we could reuse the code in a Java based implementation.

CR Example

  kind: FlinkDeployment
metadata:
  name: flink-wordcount-example
  namespace: flink-operator
  annotations:
  labels:
    environment: development
spec:
  image: example:latest
  flinkVersion: "1.14"
  flinkConfiguration:
    // everything for flink-conf.yaml
    state.savepoints.dir: file:///checkpoints/flink/savepoints
  podTemplate:
    // everything that a k8s pod template supports
    // defaults for both, job and task manager pods
  jobManager:
    resources:
      requests:
        memory: "200Mi"
        cpu: "0.1"
    replicas: 1
    podTemplate:
      // everything that a k8s pod template supports
      // layered over common template
  taskManager:
    taskSlots: 2
    resources:
      requests:
        memory: "200Mi"
        cpu: "0.1"
    podTemplate:
      // everything that a k8s pod template supports,
      // layered over common template
  // job can be optional for plain session cluster
  job:    
    jarURI: "file:///wordcount-operator-example-1.0.0-SNAPSHOT.jar"
    parallelism: 3
    entryClass: "org.apache.flink.WordCount"
    args: ""
    cancelMode: (savepoint, none)
    restoreMode: (savepoint, checkpoint, none)
  logging:
    // customize logging config in flink container   
  status:
    ...
    // information about the actual state
    // including latest savepoint/checkpoint etc.   

Java Operator SDK

The Flink operator should be built using the java-operator-sdk . The java operator sdk is the state of the art approach for building a Kubernetes operator in Java. It uses the Fabric8 k8s client like Flink does and it is open source with Apache 2.0 license.

Compatibility, Deprecation, and Migration Plan

As this is a completely new standalone component, no migration will be necessary strictly speaking. Compatibility is to be seen and will depend on any changes required to the Flink Kubernetes integration.

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

Using Go to implement the operator

While Go is often a natural fit for implementing k8s operators and there are already some open-source examples of Flink operators implemented in Go we still feel that Java is more suitable for this new component.

Main reasons for choosing Java over Go

  • Direct access to Flink Client libraries for submitting, managing jobs and handling errors
  • Most Flink developers have strong Java experience while there are only few Go experts
  • Easier to integrate with existing build system and tooling
  • Required k8s clients and tools for building an operator are also available in Java

References


  • No labels