Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
/** The builder class for {@link RecordAttributes}. */
@PublicEvolving
public class RecordAttributesBuilder {
    @Nullable private Boolean backlog = null;
 
    public RecordAttributesBuilder() {/**
        backlog =* null;
This constructor takes a }
list 
of the last recordAttributes publicreceived RecordAttributesBuilder setBacklog(boolean backlog) {...} 

    /**
     * If any operator attribute is null, we will log it at DEBUG level and use the followingfrom each
     * of the operator's inputs. When this list is not empty, it will be used to determine
     * the default values.
 for those attributes that have *not -been backlogexplicitly defaultsset toby falsecaller.
     */
 
    public RecordAttributespublic buildRecordAttributesBuilder(List<RecordAttributes> lastRecordAttributesOfInputs) {...}
}
Code Block
languagejava
/**
 * A RecordAttributes element provides stream task with information that can be used to optimize
 * the stream task's performance.
 */
@PublicEvolving
public class RecordAttributes extends RuntimeEvent {
    /**
     * If it returns true, then the records received after this element are stale
 
    public RecordAttributesBuilder setBacklog(boolean backlog) {...} 

    /**
     * If any operator attribute is null, we will log it at DEBUG level and determine a non-null
     * default value as described below.
     *
     * andDefault anvalue operatorfor canbacklog:
 optionally buffer records until isBacklog=false. This
     * allows an operator to optimize throughput at the cost of processing latency* - if any element in lastRecordAttributesOfInputs has backlog=true, use true.
     * - Otherwise, use false.
     */
     @Nullable
    public   public Boolean isBacklogRecordAttributes build() {...}
}

...


Code Block
languagejava
/**
 Subclasses* ofA thisRecordAttributes eventelement areprovides recognizedstream astask with eventsinformation exchangedthat bycan thebe coreused runtime.to */optimize
@PublicEvolving
public abstract* classthe RuntimeEventstream extends AbstractEvent {}
Code Block
languagejava
task's performance.
 */
@PublicEvolving
public interfaceclass Output<T>RecordAttributes extends Collector<T>RuntimeEvent {
    ...

    /**		
     * EmitsIf ait {@link RuntimeEvent} from an operator. This element is broadcast to allreturns true, then the records received after this element are stale
     * downstreamand operators.
an operator can optionally buffer */
records until isBacklog=false. This
 default void emitRuntimeEvent(RuntimeEvent runtimeEvent) {
*  allows  an  throwoperator new UnsupportedOperationException();
    }
}

NOTE: It is up to the operator implementation to decide when (and how often) to emit RuntimeEvent. The overhead of emitting RuntimeEvent is similar to the overhead of emitting Watermark.

3) Add methods in Input and TwoInputStreamOperator to process RecordAttributes.

Code Block
languagejava
@PublicEvolving
public interface Input<IN> {to optimize throughput at the cost of processing latency.
     */
    ...

    /** @Nullable
     *public ProcessesBoolean aisBacklog() {@link RecordAttributes} that arrived on this input.
     * This method is guaranteed to not be called concurrently with other methods of the operator.
     */
    default void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {}
}...}
}


2) Make RuntimeEvent PublicEvolving and add a method in Output to broadcast RuntimeEvent to downstream operators.

Code Block
languagejava
/** Subclasses of this event are recognized as events exchanged by the core runtime. */
@PublicEvolving
public abstract class RuntimeEvent extends AbstractEvent {}


Code Block
languagejava
@PublicEvolving
public interface TwoInputStreamOperator<IN1, IN2, OUT> Output<T> extends StreamOperator<OUT>Collector<T> {
    ...

    /**		
     * ProcessesEmits a {@link RecordAttributesRuntimeEvent} that arrived on the first input of thisfrom an operator.
     * This methodelement is guaranteedbroadcast to not be called concurrently with other methods of the operatorall
     * downstream operators.
     */
    default void processRecordAttributes1emitRuntimeEvent(RecordAttributesRuntimeEvent recordAttributesruntimeEvent) throws Exception {}

      throw  /**new UnsupportedOperationException();
     * Processes a {@link RecordAttributes} that }
}


NOTE: It is up to the operator implementation to decide when (and how often) to emit RuntimeEvent. The overhead of emitting RuntimeEvent is similar to the overhead of emitting Watermark.


3) Add methods in Input and TwoInputStreamOperator to process RecordAttributes.

Code Block
languagejava
@PublicEvolving
public interface Input<IN> {
    ...

    /**
     * Processes a {@link RecordAttributes} that arrived on this inputarrived on the second input of this operator.
     * This method is guaranteed to not be called concurrently with other methods of the operator.
     */
    default void processRecordAttributes2processRecordAttributes(RecordAttributes recordAttributes) throws Exception {}
}


Code Block
languagejava
@PublicEvolving
public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {
    ...

    /**
     * Processes a {@link RecordAttributes} that arrived on the first input of this operator.
     * This method is guaranteed to not be called concurrently with other methods of the operator.
     */
    default void processRecordAttributes1(RecordAttributes recordAttributes) throws Exception {}

    /**
     * Processes a {@link RecordAttributes} that arrived on the second input of this operator.
     * This method is guaranteed to not be called concurrently with other methods of the operator.
     */
    default void processRecordAttributes2(RecordAttributes recordAttributes) throws Exception {}
}


NOTE: An operator should take care to enforce the processing latency requirement for the NOTE: An operator should take care to enforce the processing latency requirement for the records it has received when isBacklog=false. In particular, the operator should flush/emit buffered records (inside `#processRecordAttributes`) if backlog status of the input switches from false to true, so that we do not trap records that should be processed with low latency.

...

Code Block
languagejava
@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
    ...

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

Proposed Changes

1) Update AbstractStreamOperator and AbstractStreamOperatorV2 to override #processRecordAttributes, #processRecordAttributes1 and #processRecordAttributes2 according to the following logic:

...

(

...

).build();
    }
}


Proposed Changes

1) Update AbstractStreamOperator and AbstractStreamOperatorV2 to override #processRecordAttributes, #processRecordAttributes1 and #processRecordAttributes2.

These methods should use the following code.

Code Block
languagejava
void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {
  return new RecordAttributesBuilder(lastRecordAttributesOfInputs).build();
}

This effectively allows an operator to buffer records iff any of its upstream source operators claims isProcessingBacklog=true. 

...


2) Add the APIs on Transformation interface to get the corresponding operator attributes.

...