Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

[This FLIP proposal is a joint work between Xuannan Su  and Dong Lin  ]

Motivation

Currently, the default configuration for pipeline.object-reuse is set to false. This means that each record emitted by an operator undergoes deep-copying, where it is serialized and then deserialized before being passed to the next operator in the chain. However, in cases where the upstream and downstream operators do not store or access references to the input or output records, this deep-copy overhead becomes unnecessary and can result in suboptimal performance [1]. While users can manually set pipeline.object-reuse to true, they bear the responsibility of ensuring that no operator stores or accesses record references, which may introduce the risk of data corruption. Consequently, users are faced with the dilemma of choosing between suboptimal performance and the potential for data corruption.

In this FLIP, we propose adding APIs that operators can utilize to inform the Flink runtime whether it is safe to reuse the emitted records. This enhancement would enable Flink to maximize its performance using the default configuration, eliminating the need for users to provide additional information.


Public Interfaces

1) Add OperatorAttributesBuilder and OperatorAttributes for developers to specify operator attributes.

package org.apache.flink.streaming.api.operators;
 
/** The builder class for {@link OperatorAttributes}. */
@PublicEvolving
public class OperatorAttributesBuilder {     

	@Nullable private Boolean outputStreamRecordValueStored = null;
    @Nullable private Boolean inputStreamRecordStored = null;  

    public OperatorAttributesBuilder() {
    	...
    }

    /**
     * Set to false if it is guaranteed that the operator will not store and access reference to the
     * StreamRecord#value that it has already emitted.
     */
    public OperatorAttributesBuilder setOutputStreamRecordValueStored(boolean outputStreamRecordValueStored) {...}

    /**
     * Set to false if it is guaranteed that the operator will not store and access reference to the
     * input StreamRecord instance.
     */
    public OperatorAttributesBuilder setInputStreamRecordStored(boolean inputStreamRecordStored) {...}
    
    /**
     * If any operator attribute is null, we will log it at DEBUG level and use the following
     * default values.
     * - outputStreamRecordValueStored defaults to true
     * - inputStreamRecordStored defaults to true
     */
	public OperatorAttributes build() {...}
}
package org.apache.flink.streaming.api.operators;
 
/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize job performance.
 */
@PublicEvolving 
public class OperatorAttributes {   
    /**
     * Returns false if it is guaranteed that the operator will not store and access reference to the
     * StreamRecord#value that it has already emitted.
     */
    public boolean isOutputStreamRecordValueStored() {...}

    /**
     * Return false if it is guaranteed that the operator will not store and access reference to the
     * input StreamRecord instance.
     */
    public boolean isInputStreamRecordStored() {...} 
}


2) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.

@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...
 
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}
 
@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
    ...
 
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}


3) Add the isOutputValueStored method that returns false by default to the Function interface. And the RichFunction overwrite the isOutputValueStored method to return true .

@Public
public interface Function extends java.io.Serializable {
    /**
     * Returns false if it is guaranteed that the function will not store and access
     * reference to the output value.
     */ 
    default boolean isOutputValueStored() {
        return true;
    }
}

@Public
public interface RichFunction extends Function {

    /**
     * RichFunction is able to put the values to the state backend so the method returns true by
     * default. For RichFunction that doesn't store output value to the state backend, it can return
     * false.
     */
    @Override
    default boolean isOutputValueStored() {
        return true;
    }
}


4) Update the description of pipeline.object-reuse  to mention that when it is false, Flink will decide whether to use object reuse based on the operator attributes.

Here is the updated description:

When it is true, objects that Flink internally uses for deserialization and passing data to user-code functions will be reused. When it is false, Flink will decide whether to use object reuse based on the operator attributes. Keep in mind that this can lead to bugs when the user-code function of an operation is not aware of this behavior.

Proposed Changes

1) Update OperatorChain to take advantage of the isOutputStreamRecordValueStored and isInputStreamRecordStored attribute.

Currently, the OperatorChain only considers the global execution configuration pipeline.object-reuse to determine whether to enable object-reuse for data transfer between chained operators.

We propose updating the OperatorChain as follows:

  • If pipeline.object-reuse is set to true, records emitted by this operator will be re-used.

  • Otherwise, if isOutputStreamRecordValueStored() returns false

    • If the downstream operator isInputStreamRecordStored() return false, records emitted by this operator will be re-used.

    • Otherwise, StreamRecord will be shallow-copied before being given to the next operator in the chain.
  • Otherwise, records emitted by this operator will be deep-copied before being given to the next operator in the chain.


2) Update operators/functions defined in Flink codebase to override the isOutputStreamRecordValueStored() as appropriate.

We will enumerate all operators/functions defined in Flink, identify those operators/functions that do not store or access reference to the value of the output StreamRecord, and override their isOutputStreamRecordValueStored() method to return false. Those operators that do not store or access reference to the input StreamRecord should override the isInputStreamRecordStored() method to return false.

After implementing the proposed change, the operator/function developer will have the flexibility to avoid unnecessary deep-copy overhead by appropriately overriding the isOutputStreamRecordValueStored() method. This empowers Flink to achieve optimal performance with the default configuration.

POC and Analysis

We implement a POC and run the flink-benchmarks against the POC with global object-reuse disabled. We verify that, with the change proposed in the FLIP, many of the operators in the benchmark can enable object-reuse without code change. Only the custom operator and the AbstractUdfStreamOperator that contains a RichFunction cannot enable object-reuse to its output. In order to enable object-reuse of those operator/function, couple lines of code are needed to set the operator attributes accordingly. After that, all the operator in flink-benchmarks can enable object-reuse with the global object-reuse disable.

Compatibility, Deprecation, and Migration Plan

The changes made in this FLIP are backward compatible. No deprecation plan is needed.

Migration plan: users are recommended to update their existing operator/function implementation to override the isOutputStreamRecordValueStored() API if their operator/function does not store and access reference to the StreamRecord#value that it has already emitted. And override the isInputStreamRecordStored() API if their operator does not store and access the input StreamRecord instance. We plan to write blogs to promote the new API.

Test Plan

The change will be covered with unit and integration tests.

Rejected Alternatives

Not yet.


[1] https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance