You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Motivation

The task lifecycle was changed in FLINK-22972: a new finish() phase was introduced (extracted the ‘finish’ part out of the ‘close’) and  removed the dispose().

This change was also done in table module (e.g., `AbstractMapBundleOperator` for mini-batch operation ) but not covered the UserDefinedFunction which only exposes open() and close() api for custom usage, those customers who rely on the legacy close() api may encounter wrong result or suffer runtime errors after upgrading to the new version. This is a bug caused by the breaking change, but due to the public api change, we propose this flip.

Public Interfaces

Add a new flinish() method to UserDefinedFunction and update the comment of the close() method.

UserDefinedFunction
/**
 * This method is called at the end of data processing. After this method is called, no more records can be produced for the downstream
 * operators.
 *
 * <p><b>NOTE:</b>This method does not need to close any resources. You should release external
 * resources in the {@link #close()} method. More details can see {@link StreamOperator#finish}.
 *
 */
public void finish() throws Exception {
  // do nothing
}

/**
* Tear-down method for user-defined function. It can be used for resource clean up work. By default,
* this method does nothing.
*
* <p><b>NOTE:</b>It can not emit any records! If you need to emit records at the end of
* processing, do so in the {@link #finish()} method.
*/
public void close() throws Exception {
  // do nothing
}

 

Proposed Changes

All operators related to UserDefinedFunction should add corresponding finish() method invocation, including but not limited to calc, correlate and aggregate operators.

Compatibility, Deprecation, and Migration Plan

The default implementation of the newly added finish() method is empty, so it’s compatible for current users. And those users who rely on finish logic in the legacy close() method (< 1.14) can migrate it to the new finish() method.

Test Plan

Rejected Alternatives

  • No labels