...
3) Add the isOutputValueStored
method that returns false
by default to the Function
interface. And the RichFunction
overwrite the isOutputValueStored
method to return true
.
Code Block | ||
---|---|---|
| ||
@Public public interface Function extends java.io.Serializable { /** * Returns false if it is guaranteed that the function will not store and access * reference to the output value. */ default boolean isOutputValueStored() { return false; } } @Public public interface RichFunction extends Function { /** * RichFunction is able to put the values to the state backend so the method returns true by * default. For RichFunction that doesn't store output value to the state backend, it can return * false. */ @Override default boolean isOutputValueStored() { return true; } } |
...
We will enumerate all operators/functions defined in Flink, identify those operators/functions which that do not store or access reference to the value of the output StreamRecord, and override their isOutputStreamRecordValueStored()
method to return truefalse. Those operators which that do not store or or access reference to the input StreamRecord
should override the isInputStreamRecordStored()
method to return false.
After implementing the proposed change, the operator/function developer will have the flexibility to avoid unnecessary deep-copy overhead by appropriately overriding the isOutputStreamRecordValueStored()
method. This empowers Flink to achieve optimal performance with the default configuration.
POC and Analysis
We implement a POC and run the flink-benchmarks against the POC with global object-reuse disabled. We verify that, with the change proposed in the FLIP, many of the operators in the benchmark can enable object-reuse without code change. Only the custom operator and the AbstractUdfStreamOperator
that contains a RichFunction
cannot enable object-reuse to its output. In order to enable object-reuse of those operator/function, couple lines of code are needed to set the operator attributes accordingly. After that, all the operator in flink-benchmarks can enable object-reuse with the global object-reuse disable.
Compatibility, Deprecation, and Migration Plan
...