You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 16 Current »

Status

Current state: Accepted

Discussion thread: https://lists.apache.org/thread/fpp5m9jkr0wnjryd07xtpj13t80z99yt

Vote thread: https://lists.apache.org/thread/0lk2om64cyrjjvdqzww9w0g89z4vnmcm

JIRA: Unable to render Jira issues macro, execution error.

Released: Flink Kubernetes Operator release-1.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, the Flink kubernetes operator has supported two deployment modes, Application and Session. In session mode, it could only support starting, upgrading and shutdown the session cluster. Users have to apply a FlinkDeployment CR yaml to create a session cluster first, and then use the flink client to submit a job to the existing session. This is a major problem especially when users want to submit the Flink job out of the K8s cluster. This FLIP proposes to introduce a new more k8s-native way to simplify the session jobs management.

Public Interfaces

The public interface is essentially the custom resource descriptor (CRD), see below.

Proposed Changes

A new CRD  FlinkSessionJob will be introduced. The example is shown as below:

FlinkSessionJob CRD

apiVersion: flink.apache.org/v1alpha1
kind: FlinkSessionJob
metadata:
  namespace: default
  name: flink-session-job-example
spec:
  clusterId: flink-session-cluster-1
  job:
    jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.3/flink-examples-streaming_2.12-1.14.3.jar
    entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample
    args: ""
    parallelism: 2    
    upgradeMode: (stateless, savepoint, last-state)
    state: (running, suspended, error)
    initialSavepointPath: "s3://mysavepoint"

The spec contains two parts, the cluster-id and job.

The cluster-id is used to link to the session cluster which is managed by the FlinkDeployment.

The job contains all the information to submit a Flink job. It should be the same as “.spec.job” in FlinkDeployment. If users want to submit multiple jobs into the same cluster, they need to create a dedicated FlinkSessionJob CR for each one. 

The jarURI here will support different filesystems, It will leverage the flink-filesystems to download user jar to local. In first version, Http and local files will be supported first.

Job Management

The FlinkSessionJob CR will be handled by the individual FlinkSessionJobController which will handle the create,update,delete events of the FlinkSessionJob CR.

FlinkSessionJobController will use JarRunHeaders(aka /jars/:jarid/run) to interact with JobManager. In this way we will let the JobManager to build the JobGraph and do the concrete submission. The operator will upload the user jar first and then call the REST API to run the specified jar.

For other job operations, we can reuse the work of the current application mode to support upgrade, stop, restart and savepoint workflow.

Compatibility, Deprecation, and Migration Plan

As this is a completely new standalone component, and it will be introduced as experimental first.

Rejected Alternatives

Using RestClusterClient#submitJob to submit job

Main reasons not to choose this way

  • The JobGraph is generated in the operator, which is bundled with the flinkVersion and will bring compatibility problems when supporting multiple flink versions.
  • It will make the submission  process a little heavy.
  • No labels