...
This section details the proposed changes for the options 3 chosen in the last section.
Changes to Public(Evolving) APIs
We lack a clear method in the API to stop processing records and flush any buffered records. We do have the StreamOperator#close
method which is supposed to flush all records, but at the same time, currently, it closes all resources, including connections to external systems. We need separate methods for flushing and closing resources because we might need the connections when performing the final checkpoint, once all records are flushed. Moreover, the logic for closing resources is duplicated in the StreamOperator#dispose
method. Lastly, the semantic of RichFunction#close
is different from StreamOperator#close
. Having no direct access to any output the RichFunction#close
is responsible purely for releaseing resources. We suggest using this opportunity to clean up the semi-public StreamOperator
API and:
- remove the
dispose
method - change the semantic of
close
method - introduce new
finish
method
Effectively it would modify the Operator
lifecycle (https://ci.apache.org/projects/flink/flink-docs-master/docs/internals/task_lifecycle/#operator-lifecycle-in-a-nutshell) termination phase to be:
Code Block |
---|
// termination phase
OPERATOR::endOfInput(1)
...
OPERATOR::endOfInput(n)
OPERATOR::finish
UDF(if available)::finish
OPERATOR::snapshotState()
OPERATOR::notifyCheckpointComplete()
OPERATOR::close() --> call UDF's close method
UDF::close() |
In case of failure, we will call the Operator#close → UDF#close method.
Code Block |
---|
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
....
/**
* This method is called at the end of data processing.
*
* <p>The method is expected to flush all remaining buffered data. Exceptions during this
* flushing of buffered should be propagated, in order to cause the operation to be recognized
* as failed, because the last data items are not processed properly.
*
* <p><b>NOTE:</b>This method does not need to close any resources. You should release external
* resources in the {@link #close()} method.
*
* @throws java.lang.Exception An exception in this method causes the operator to fail.
*/
void finish() throws Exception;
/**
* This method is called at the very end of the operator's life, both in the case of a
* successful completion of the operation, and in the case of a failure and canceling.
*
* <p>This method is expected to make a thorough effort to release all resources that the
* operator has acquired.
*
* <p><b>NOTE:</b>It should not emit any records! If you need to emit records at the end of
* processing, do so in the {@link #finish()} method.
*/
void close() throws Exception;
...
} |
The UDF that most often buffers data and thus requires a flushing/finishing phase is the SinkFunction
where it could e.g. create transactions in an external system that can be committed during the final snapshot. Therefore we suggest introducing a finish
method in the SinkFunction
:
Code Block |
---|
@Public
public interface SinkFunction<IN> extends Function, Serializable {
default void finish() throws Exception {}
} |
Checkpoint Format with Finished Tasks
...