Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This proposed change is meant to enable greater usability of Flink on YARN for batch jobs by introducing the option of waiting until a Flink application has completed before returning back to the Flink Client. This would allow us to directly get the result and status of the application rather than just knowing that job submission was successful.

Currently the Flink Client will immediately exit and return a Successful status when the Flink Application hits RUNNING state when running on yarn, indicating successful job submission but not application completion. By having a configurable option for batch jobs to wait on application completion, we can have richer information about the underlying application status for batch jobs. This is necessary when utilizing flow orchestrators like Airflow, Oozie or Azkaban which rely on the exit status of the application launcher process to provide correct information to users.

Public Interfaces

In order to facilitate this new feature, while providing controls around it, a new parameter will be introduced to allow users to decide whether or not they want to wait on the underlying application to complete before breaking out of the Flink client. 

The new configuration will be introduced in the flink-conf.yaml:

  • yarn.application.wait-for-completion: when set as true, the client will wait until the underlying Flink application completes before returning back to the client. It will also retrieve the Final Application Master status and return it in the logs. When set to false, it will continue to behave as before where the Flink client terminates upon successful submission of the Flink application. Additionally, default behavior will be false for this parameter or when it is not explicitly specified.

Proposed Changes

Adding a new parameter to the flink-conf.yaml

Code Block
yarn.application.wait-for-completion


Compatibility, Deprecation, and Migration Plan

  • No impact on the existing users
  • The proposed changes introduce new parameters to the configuration and do not change existing behavior, and no need to phase out older behavior
  • No special migration tools required.
  • Since there are no changes being made to existing behavior, there is no need to remove anything

Test Plan

We will test the changes manually by adding in the new parameter in the flink-conf.yaml and running a Flink batch job when the Yarn.application.wait-for-completion is set to true, that the client terminates when the application is completed and we get an accurate yarn application status based on the result of the flink application. We will also set it to false to confirm that when an application successfully submits, regardless of whether the application itself succeeds or fails, we break out after the flink application is in RUNNING state. We will also confirm that the default behavior is unchanged. 

Rejected Alternatives

There was a previous push for a similar feature in https://issues.apache.org/jira/browse/FLINK-25495. One potential approach for achieving a similar result would be to poll the deployed application cluster to find out when the job has terminated and retrieving the status from the cluster. However, in terms of usability, it is preferable to have it as a feature within flink itself to allow for better integration with flow orchestrators.

Another proposed alternative in this thread was to use attached mode when running in application mode, however, attached mode is not reliable as the Flink client could disconnect due to network issues or a JobManager failover. Adding a new configuration to handle this behavior in YarnClusterDescriptor, protected behind a configuration variable is much simpler.