...
Code Block | ||
---|---|---|
| ||
package org.apache.flink.streaming.api.operators; /** The builder class for {@link OperatorAttributes}. */ @PublicEvolving public class OperatorAttributesBuilder { @Nullable private Boolean outputStreamRecordValueStored = ... null; @Nullable private Boolean inputStreamRecordStored = null; public OperatorAttributesBuilder() { ... } /** * Set to false if it is guaranteed that the operator will not store and access reference to the * StreamRecord#value that it has already emitted. */ public OperatorAttributesBuilder setOutputStreamRecordValueStored(boolean outputStreamRecordValueStored) {...} /** * isObjectReuseCompliantSet = false; to false if it is guaranteed that the operator will not store and access reference to the } * input StreamRecord instance. */ public OperatorAttributesBuilder setIsObjectReuseCompliantsetInputStreamRecordStored(boolean isObjectReuseCompliantinputStreamRecordStored) {...} /** * If any operator attribute is null, we will log it at DEBUG level and use the following * default values. * - outputStreamRecordValueStored defaults to true * - inputStreamRecordStored defaults to true */ public OperatorAttributes build() {...} } |
...
Code Block | ||
---|---|---|
| ||
package org.apache.flink.streaming.api.operators; /** * OperatorAttributes element provides Job Manager with information that can be * used to optimize job performance. */ @PublicEvolving public class OperatorAttributes { /** * Returns false if it is guaranteed that the operator will not store and access reference to the * StreamRecord#value that it has already emitted. */ public boolean isOutputStreamRecordValueStored() {...} /** * ReturnsReturn truefalse if it is guaranteed that the operator will not store and access reference to the * reference to input orStreamRecord output recordsinstance. */ public boolean getIsObjectReuseCompliantisInputStreamRecordStored() {...} } |
2) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.
Code Block | ||
---|---|---|
| ||
@PublicEvolving public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable { ... default OperatorAttributes getOperatorAttributes() { return new OperatorAttributesBuilder().build(); } } @PublicEvolving public interface StreamOperatorFactory<OUT> extends Serializable { ... default OperatorAttributes getOperatorAttributes() { return new OperatorAttributesBuilder().build(); } } |
3) Add the isObjectReuseCompliant
isOutputValueStored
method that returns false
by default to the Function
interface. And the RichFunction
overwrite the isOutputValueStored
method to return true
.
Code Block | ||
---|---|---|
| ||
@Public public interface Function extends java.io.Serializable { /** * Returns truefalse if it is guaranteed that the function will not store and access * reference to the input or output recordsvalue. */ default boolean isObjectReuseCompliantisOutputValueStored() { return true; } } @Public public interface RichFunction extends Function { /** * RichFunction is able to put the values to the state backend so the method returns true by * default. For RichFunction that doesn't store output value to the state backend, it can return * false. */ @Override default boolean isOutputValueStored() { return true; } } |
4) Update the description of pipeline.object-reuse
to mention that when it is false, Flink will decide whether to use object reuse based on the operator attributes.
Here is the updated description:
When it is true, objects that Flink internally uses for deserialization and passing data to user-code functions will be reused. When it is false, Flink will decide whether to use object reuse based on the operator attributes. Keep in mind that this can lead to bugs when the user-code function of an operation is not aware of this behavior.
Proposed Changes
1) Update OperatorChain
to take advantage of the isObjectReuseCompliant
isOutputStreamRecordValueStored
and isInputStreamRecordStored
attribute.
Currently, the OperatorChain
only considers the global execution configuration pipeline.object-reuse
to determine whether to enable object-reuse for data transfer between chained operators.
...
If
pipeline.object-reuse
is set to true, records emitted by this operator will be re-used.Otherwise, if
getIsObjectReuseCompliantisOutputStreamRecordValueStored()
returns true and all the downstream operators are object-reuse compliantfalseIf the downstream operator
isInputStreamRecordStored()
return false, records emitted by this operator will be re-used.- Otherwise,
StreamRecord
will be shallow-copied before being given to the next operator in the chain.
Otherwise, records emitted by this operator will be deep-copied before being given to the next operator in the chain.
...
2) Update operators/functions defined in Flink codebase to override the getIsObjectReuseCompliantisOutputStreamRecordValueStored()
as appropriate.
We will enumerate all operators/functions defined in Flink, identify those operators/functions which that do not store and or access reference to the value of the input or output recordsStreamRecord, and override their getIsObjectReuseCompliantisOutputStreamRecordValueStored()
method to return false. Those operators that do not store or access reference to the input StreamRecord
should override the isInputStreamRecordStored()
method to return truefalse.
After implementing the proposed change, in typical cases where jobs don't involve user-defined operators or functions, such as SQL jobs, Flink will only perform deep-copying of records when it is essential for ensuring correctness. For jobs that do incorporate user-defined operators or functions, the operator/function developer will have the flexibility to avoid unnecessary deep-copy overhead by appropriately overriding the getIsObjectReuseCompliantisOutputStreamRecordValueStored()
method. This empowers Flink to achieve optimal performance with the default configuration.
POC and Analysis
We implement a POC and run the flink-benchmarks against the POC with global object-reuse disabled. We verify that, with the change proposed in the FLIP, many of the operators in the benchmark can enable object-reuse without code change. Only the custom operator and the AbstractUdfStreamOperator
that contains a RichFunction
cannot enable object-reuse to its output. In order to enable object-reuse of those operator/function, couple lines of code are needed to set the operator attributes accordingly. After that, all the operator in flink-benchmarks can enable object-reuse with the global object-reuse disable.
Compatibility, Deprecation, and Migration Plan
...
Migration plan: users are recommended to update their existing operator/function implementation to override the getIsObjectReuseCompliantisOutputStreamRecordValueStored()
API if their operator/function does not store and access reference to the records StreamRecord#value
that it has already emitted. And override the isInputStreamRecordStored()
API if their operator does not store and access the input StreamRecord
instance. We plan to write blogs to promote the new API.
...