Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.flink.streaming.api.operators;
 
/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize job performance.
 */
@PublicEvolving 
public class OperatorAttributes {   
    /**
     * Returns truefalse if it is guaranteed that the operator will not store and access reference to the
     * StreamRecord#value that it has already emitted.
     */
    public boolean isObjectReuseCompliantisOutputStreamRecordValueStored() {...}

    /**
     * Return false if it is guaranteed that the operator will not store and access reference to the
     * input StreamRecord instance.
     */
    public boolean isInputStreamRecordStored() {...} 
}

...

Code Block
languagejava
@Public
public interface Function extends java.io.Serializable {
    /**
     * Returns true if it is guaranteed that the function will not store and access
     * reference to the output value.
     */ 
    default boolean isObjectReuseCompliantisOutputValueStored() {
        return false;
    }
}


Proposed Changes

...

  • If pipeline.object-reuse is set to true, records emitted by this operator will be re-used.

  • Otherwise, if i sObjectReuseCompliantisOutputStreamRecordValueStored() returns true

    • If the downstream operator isInputStreamRecordStored() return false, records emitted by this operator will be re-used.

    • Otherwise, StreamRecord will be shallow-copied before being given to the next operator in the chain.
  • Otherwise, records emitted by this operator will be deep-copied before being given to the next operator in the chain.

...

2) Update operators/functions defined in Flink codebase to override the i sObjectReuseCompliantisOutputStreamRecordValueStored() as appropriate.

We will enumerate all operators/functions defined in Flink, identify those operators/functions which do not store or access reference to the value of the output StreamRecord, and override their i sObjectReuseCompliantisOutputStreamRecordValueStored() method to return true. Those operators which do not store or  access reference to the input StreamRecord should override the isInputStreamRecordStored() method to return false.

After implementing the proposed change, the operator/function developer will have the flexibility to avoid unnecessary deep-copy overhead by appropriately overriding the i sObjectReuseCompliantisOutputStreamRecordValueStored() method. This empowers Flink to achieve optimal performance with the default configuration.

...