Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This proposed change is meant to enable enables greater usability of Flink on YARN for batch jobs by introducing the option of waiting until a Flink application has completed before returning back to the Flink Client. This would allow us to directly get the result and status of the application rather than just knowing that job submission was successful.

Currently the Flink Client will immediately exit and return a Successful status when the Flink Application hits RUNNING state when running on yarn, indicating successful job submission but not application completion. By having a configurable option for batch jobs to wait on application completion, we can have richer information about the underlying application status for batch jobs. This is necessary when utilizing flow orchestrators like Airflow, Oozie or Azkaban which rely on the exit status of the application launcher process to provide correct information to users.completion status to users.

This feature currently exists in the per-job mode, which is being dropped by the community. Enabling this feature in Application mode prevents feature regression. 

Public Interfaces

In order to facilitate this new feature, while providing controls around it, a new parameter will be introduced to we will modify an existing parameter execution.attached to be usable YARN Application submission which will allow users to decide whether or not they want to wait on the underlying application to complete before breaking out of the Flink client. 

We will rename the execution.attached configuration to client.attached.after.submission to reflect that it is a client side configuration while following the proper deprecation process.

The new configuration will be introduced declared in in the flink-conf.yaml:

  • yarnclient.attached.application.wait-for-completionafter.submission: when set as true, the client will wait until the underlying Flink application completes before returning back to the client. It will also retrieve the Final Application Master status and return it in the logs. When set to false, it will continue to behave as before - where the Flink client terminates upon successful submission of the Flink application. Additionally, default behavior will be false for this parameter or when it is not explicitly specified.

Proposed Changes

Adding a new parameter configuration to the flink-conf.yaml

yarn.application.wait-for-completion
Code Block
client.attached.after.submission

Which will replace execution.attached. Configuring the Application Deployment Mode to honor the client.attached.after.submission configuration when starting the Application cluster.

Compatibility, Deprecation, and Migration Plan

  • No impact on the existing usersThe proposed changes introduce new parameters to the configuration and do not change existing behavior, and no need to phase out older behavior
  • client.attached.after.submission. The execution.attached configuration has only been enabled for per Job application submission, which is being deprecated. 
  • We will mark the execution.attached configuration as deprecated, and add in the client.attached.after.submission configuration and use it in other deployment environments. By default, it will be set to false. 
  • Over time we will remove the execution.attached configuration. 
  • No special migration tools required.
  • Since there are no changes being made to existing behavior, there is no need to remove anything

Test Plan

We will test the changes manually by adding in the new parameter in the flink-conf.yaml and running a Flink batch job when the Yarnclient.attached.application.wait-for-completion after.submission is set to true, that the client terminates when the application is completed and we get an accurate yarn application status based on the result of the flink application. We will also set it to false to confirm that when an application successfully submits, regardless of whether the application itself succeeds or fails, we break out after the flink application is in RUNNING state. We will also confirm that the default behavior is unchanged.  

Rejected Alternatives

There was a previous push for a similar feature in https://issues.apache.org/jira/browse/FLINK-25495

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-25495
. Our approach is similar but the PR and effort was dropped. Our approach is similar but we will be replacing the configuration execution.attached to client.attached.after.submission to better reflect that it is a client side configuration. One potential approach for achieving a similar result would be to poll the deployed application cluster to find out when the job has terminated and retrieving retrieve the status from the cluster. However, in terms of usability, it is preferable to have it as a feature within flink itself to allow for better integration with flow orchestrators.Another proposed alternative in this thread was to use attached mode when running in application mode, however, attached mode is not reliable as the Flink client could disconnect due to network issues or a JobManager failover. Adding a new configuration to handle this behavior in YarnClusterDescriptor, protected behind a configuration variable is much simpler.