...
Code Block | ||
---|---|---|
| ||
package org.apache.flink.streaming.api.operators; /** * OperatorAttributes element provides Job Manager with information that can be * used to optimize job performance. */ @PublicEvolving public class OperatorAttributes { /** * Returns truefalse if it is guaranteed that the operator will not store and access reference to the * StreamRecord#value that it has already emitted. */ public boolean isObjectReuseCompliantisOutputStreamRecordValueStored() {...} /** * Return false if it is guaranteed that the operator will not store and access reference to the * input StreamRecord instance. */ public boolean isInputStreamRecordStored() {...} } |
...
Code Block | ||
---|---|---|
| ||
@Public public interface Function extends java.io.Serializable { /** * Returns true if it is guaranteed that the function will not store and access * reference to the output value. */ default boolean isObjectReuseCompliantisOutputValueStored() { return false; } } |
Proposed Changes
...
If
pipeline.object-reuse
is set to true, records emitted by this operator will be re-used.Otherwise, if i
sObjectReuseCompliantisOutputStreamRecordValueStored()
returns trueIf the downstream operator
isInputStreamRecordStored()
return false, records emitted by this operator will be re-used.- Otherwise,
StreamRecord
will be shallow-copied before being given to the next operator in the chain.
Otherwise, records emitted by this operator will be deep-copied before being given to the next operator in the chain.
...
2) Update operators/functions defined in Flink codebase to override the i sObjectReuseCompliantisOutputStreamRecordValueStored()
as appropriate.
We will enumerate all operators/functions defined in Flink, identify those operators/functions which do not store or access reference to the value of the output StreamRecord, and override their i sObjectReuseCompliantisOutputStreamRecordValueStored()
method to return true. Those operators which do not store or access reference to the input StreamRecord
should override the isInputStreamRecordStored()
method to return false.
After implementing the proposed change, the operator/function developer will have the flexibility to avoid unnecessary deep-copy overhead by appropriately overriding the i sObjectReuseCompliantisOutputStreamRecordValueStored()
method. This empowers Flink to achieve optimal performance with the default configuration.
...