Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, Flink has a global execution configuration pipeline.object-reuse
, which is default to false, to decide whether to use object-reuse. The current default behavior of disabling object-reuse for streaming jobs aims to prevent unexpected behavior. When object-reuse is disabled, data transfers between chained operators go through a serialization/copy/deserialization phase. However, object-reuse becomes problematic only in the following two cases:
- The upstream operator stores the output records and the downstream operator modifies the input records.
- The upstream operator stores the output records and the downstream operator stores the input records. Then, the upstream operator modifies stored output records.
Fortunately, many operators implemented by Flink do not store output records or they make a deep copy of the records to be stored, making it safe to enable object-reuse for data transfer between such operators (e.g., Flink SQL operators). Enabling object-reuse can significantly improve throughput. [1]
To address this, we propose adding public APIs allowing an operator to notify the Flink runtime about whether it is safe to enable object-reuse for the operator and leverage the associated performance benefits.
Public Interfaces
Note: The API is compatible with the API proposed in FLIP-327.
1) Add OperatorAttributesBuilder
and OperatorAttributes
for developers to specify operator attributes.
package org.apache.flink.streaming.api.operators; /** The builder class for {@link OperatorAttributes}. */ @PublicEvolving public class OperatorAttributesBuilder { ... public OperatorAttributesBuilder() { ... isObjectReuseCompliant = false; } public OperatorAttributesBuilder setIsObjectReuseCompliant(boolean isObjectReuseCompliant) {...} public OperatorAttributes build() {...} }
package org.apache.flink.streaming.api.operators; /** * OperatorAttributes element provides Job Manager with information that can be * used to optimize job performance. */ @PublicEvolving public class OperatorAttributes { /** * Returns true when it is guaranteed that the operator will not store the objects to be emitted * in state backend or memory. */ public boolean getIsObjectReuseCompliant() {...} }
2) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.
@PublicEvolving public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable { ... default OperatorAttributes getOperatorAttributes() { return new OperatorAttributesBuilder().build(); } } @PublicEvolving public interface StreamOperatorFactory<OUT> extends Serializable { ... default OperatorAttributes getOperatorAttributes() { return new OperatorAttributesBuilder().build(); } }
3) Add the isObjectReuseCompliant
method that returns false
by default to the Function
interface.
@Public public interface Function extends java.io.Serializable { /** * Returns true when it is guaranteed that the function will not store the objects to be emitted * in state backend or memory. */ default boolean isObjectReuseCompliant() { return false; } }
Proposed Changes
1) Update OperatorChain
Currently, the OperatorChain
only considers the global execution configuration pipeline.object-reuse
to determine whether to enable object-reuse for data transfer between chained operators. We propose updating the OperatorChain
as follows:
If
pipeline.object-reuse
is set totrue
, enable object-reuse for all data transfers.If
pipeline.object-reuse
is set tofalse
, check theisObjectReuseCompliant
attributes of the upstream operator to determine whether to enable object-reuse.
In scenarios where an operator has multiple chained outputs, the OperatorChain
will handle object-reuse as follows:
- If object-reuse is enabled for a chained output, make a shallow copy of the
StreamRecord
.
These updates will ensure that object-reuse is selectively enabled based on the attribute of the operators, enhancing performance and avoiding unwanted side effects.
2) Update internal operators and functions
To align with the proposed changes, we need to update internal operators and functions that guarantee not to store output records. Each internal operator should set the corresponding attributes accordingly. Specifically:
For operators that guarantee not to store output records, set the
isObjectReuseCompliant
attribute totrue
.
Compatibility, Deprecation, and Migration Plan
The changes made in this FLIP are backward compatible. No deprecation or migration plan is needed.
Test Plan
The change will be covered with unit and integration tests.
Rejected Alternatives
Not yet.
[1] https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance