You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 10 Next »


Discussion thread-
Vote thread-
JIRA-
Release-

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Motivation

Currently, Flink has a global execution configuration pipeline.object-reuse, which is default to false, to decide whether to use object-reuse. The current default behavior of disabling object-reuse for streaming jobs aims to prevent unexpected behavior. When object-reuse is disabled, data transfers between chained operators go through a serialization/copy/deserialization phase. However, object-reuse becomes problematic only in the following two cases:

  1. The upstream operator stores the output records and the downstream operator modifies the input records.
  2. The upstream operator stores the output records and the downstream operator stores the input records. Then, the upstream operator modifies stored output records.

Fortunately, many operators implemented by Flink do not store output records or they make a deep copy of the records to be stored, making it safe to enable object-reuse for data transfer between such operators (e.g., Flink SQL operators). Enabling object-reuse can significantly improve throughput. [1] 

To address this, we propose adding public APIs allowing an operator to notify the Flink runtime about whether it is safe to enable object-reuse for the operator and leverage the associated performance benefits.


Public Interfaces

Note: The API is compatible with the API proposed in FLIP-327.


1) Add OperatorAttributesBuilder and OperatorAttributes for developers to specify operator attributes.

package org.apache.flink.streaming.api.operators;
 
/** The builder class for {@link OperatorAttributes}. */
@PublicEvolving
public class OperatorAttributesBuilder {
    ...
 
    public OperatorAttributesBuilder() {
      ...
      isObjectReuseCompliant = false;
    }

    public OperatorAttributesBuilder setIsObjectReuseCompliant(boolean isObjectReuseCompliant) {...}
 
    public OperatorAttributes build() {...}
}
package org.apache.flink.streaming.api.operators;
 
/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize job performance.
 */
@PublicEvolving public class OperatorAttributes {   
    
    /**
     * Returns true when it is guaranteed that the operator will not store the objects to be emitted
     * in state backend or memory.
     */
	public boolean getIsObjectReuseCompliant() {...}
}


2) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.

@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...
 
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}
 
@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
    ...
 
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}


3) Add the isObjectReuseCompliant method that returns false by default to the Function interface.

@Public
public interface Function extends java.io.Serializable {

    /**
     * Returns true when it is guaranteed that the function will not store the objects to be emitted
     * in state backend or memory.
     */
    default boolean isObjectReuseCompliant() {
        return false;
    }
}


Proposed Changes

1) Update OperatorChain

Currently, the OperatorChain only considers the global execution configuration pipeline.object-reuse to determine whether to enable object-reuse for data transfer between chained operators. We propose updating the OperatorChain as follows:

  • If pipeline.object-reuse is set to true, enable object-reuse for all data transfers.

  • If pipeline.object-reuse is set to false, check the isObjectReuseCompliant attributes of the upstream operator to determine whether to enable object-reuse.

In scenarios where an operator has multiple chained outputs, the OperatorChain will handle object-reuse as follows:

  • If object-reuse is enabled for a chained output, make a shallow copy of the StreamRecord.

These updates will ensure that object-reuse is selectively enabled based on the attribute of the operators, enhancing performance and avoiding unwanted side effects.


2) Update internal operators and functions

To align with the proposed changes, we need to update internal operators and functions that guarantee not to store output records. Each internal operator should set the corresponding attributes accordingly. Specifically:

  • For operators that guarantee not to store output records, set the isObjectReuseCompliant attribute to true.


Compatibility, Deprecation, and Migration Plan

The changes made in this FLIP are backward compatible. No deprecation or migration plan is needed.

Test Plan

The change will be covered with unit and integration tests.

Rejected Alternatives

Not yet.


[1] https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance



  • No labels