Discussion thread | https://lists.apache.org/thread/2h2r68m7bwsnvd8w1m50rktd7w6mr5n4 |
---|---|
Vote thread | - |
JIRA | - |
Release | - |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
[This FLIP proposal is a joint work between Xuannan Su and Dong Lin ]
Motivation
Currently, the default configuration for pipeline.object-reuse
is set to false. This means that each record emitted by an operator undergoes deep-copying, where it is serialized and then deserialized before being passed to the next operator in the chain. However, in cases where the upstream operator does not store or access references to the emitted records, this deep-copy overhead becomes unnecessary and can result in suboptimal performance [1]. While users can manually set pipeline.object-reuse
to true, they bear the responsibility of ensuring that no operator stores or accesses record references, which may introduce the risk of data corruption. Consequently, users are faced with the dilemma of choosing between suboptimal performance and the potential for data corruption.
In this FLIP, we propose adding APIs that operators can utilize to inform the Flink runtime whether it is safe to reuse the emitted records. This enhancement would enable Flink to maximize its performance using the default configuration, eliminating the need for users to provide additional information.
Public Interfaces
1) Add OperatorAttributesBuilder
and OperatorAttributes
for developers to specify operator attributes.
package org.apache.flink.streaming.api.operators; /** The builder class for {@link OperatorAttributes}. */ @PublicEvolving public class OperatorAttributesBuilder { ... public OperatorAttributesBuilder() { ... isObjectReuseCompliant = false; } public OperatorAttributesBuilder setIsObjectReuseCompliant(boolean isObjectReuseCompliant) {...} public OperatorAttributes build() {...} }
package org.apache.flink.streaming.api.operators; /** * OperatorAttributes element provides Job Manager with information that can be * used to optimize job performance. */ @PublicEvolving public class OperatorAttributes { /** * Returns true if it is guaranteed that the operator will not store and access * reference to the records that it has already emitted. */ public boolean getIsObjectReuseCompliant() {...} }
2) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.
@PublicEvolving public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable { ... default OperatorAttributes getOperatorAttributes() { return new OperatorAttributesBuilder().build(); } } @PublicEvolving public interface StreamOperatorFactory<OUT> extends Serializable { ... default OperatorAttributes getOperatorAttributes() { return new OperatorAttributesBuilder().build(); } }
3) Add the isObjectReuseCompliant
method that returns false
by default to the Function
interface.
@Public public interface Function extends java.io.Serializable { /** * Returns true if it is guaranteed that the function will not store and access * reference to the records that it has already emitted. */ default boolean isObjectReuseCompliant() { return false; } }
Proposed Changes
1) Update OperatorChain
to take advantage of the isObjectReuseCompliant
attribute.
Currently, the OperatorChain
only considers the global execution configuration pipeline.object-reuse
to determine whether to enable object-reuse for data transfer between chained operators.
We propose updating the OperatorChain
as follows:
If pipeline.object-reuse is set to true, records emitted by this operator will be re-used.
Otherwise, if
getIsObjectReuseCompliant()
returns true, records emitted by this operator will be re-used.Otherwise, records emitted by this operator will be deep-copied before being given to the next operator in the chain.
2) Update operators/functions defined in Flink codebase to override the getIsObjectReuseCompliant()
as appropriate.
We will enumerate all operators/functions defined in Flink, identify those operators/functions which do not store and access reference to the records that it has already emitted, and override their getIsObjectReuseCompliant()
method to return true.
After implementing the proposed change, in typical cases where jobs don't involve user-defined operators or functions, such as SQL jobs, Flink will only perform deep-copying of records when it is essential for ensuring correctness. For jobs that do incorporate user-defined operators or functions, operator/function developer will have the flexibility to avoid unnecessary deep-copy overhead by appropriately overriding the getIsObjectReuseCompliant()
method. This empowers Flink to achieve optimal performance with default configuration.
Compatibility, Deprecation, and Migration Plan
The changes made in this FLIP are backward compatible. No deprecation plan is needed.
Migration plan: users are recommended to update their existing operator/function implementation to override the getIsObjectReuseCompliant()
API if their operator/function does not store and access reference to the records that it has already emitted. We plan to write blogs to promote the new API.
Test Plan
The change will be covered with unit and integration tests.
Rejected Alternatives
Not yet.
[1] https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance