...
Code Block |
---|
|
/** The builder class for {@link RecordAttributes}. */
@PublicEvolving
public class RecordAttributesBuilder {
@Nullable private Boolean backlog = null;
public RecordAttributesBuilder() {/**
backlog =* null;
This constructor takes a }
list
of the last recordAttributes publicreceived RecordAttributesBuilder setBacklog(boolean backlog) {...}
/**
* If any operator attribute is null, we will log it at DEBUG level and use the followingfrom each
* of the operator's inputs. When this list is not empty, it will be used to determine
* the default values.
for those attributes that have *not -been backlogexplicitly defaultsset toby falsecaller.
*/
public RecordAttributespublic buildRecordAttributesBuilder(List<RecordAttributes> lastRecordAttributesOfInputs) {...}
} |
Code Block |
---|
|
/**
* A RecordAttributes element provides stream task with information that can be used to optimize
* the stream task's performance.
*/
@PublicEvolving
public class RecordAttributes extends RuntimeEvent {
/**
* If it returns true, then the records received after this element are stale
public RecordAttributesBuilder setBacklog(boolean backlog) {...}
/**
* If any operator attribute is null, we will log it at DEBUG level and determine a non-null
* default value as described below.
*
* andDefault anvalue operatorfor canbacklog:
optionally buffer records until isBacklog=false. This
* allows an operator to optimize throughput at the cost of processing latency* - if any element in lastRecordAttributesOfInputs has backlog=true, use true.
* - Otherwise, use false.
*/
@Nullable
public public Boolean isBacklogRecordAttributes build() {...}
} |
...
Code Block |
---|
|
/**
Subclasses* ofA thisRecordAttributes eventelement areprovides recognizedstream astask with eventsinformation exchangedthat bycan thebe coreused runtime.to */optimize
@PublicEvolving
public abstract* classthe RuntimeEventstream extends AbstractEvent {} |
Code Block |
---|
|
task's performance.
*/
@PublicEvolving
public interfaceclass Output<T>RecordAttributes extends Collector<T>RuntimeEvent {
...
/**
* EmitsIf ait {@link RuntimeEvent} from an operator. This element is broadcast to allreturns true, then the records received after this element are stale
* downstreamand operators.
an operator can optionally buffer */
records until isBacklog=false. This
default void emitRuntimeEvent(RuntimeEvent runtimeEvent) {
* allows an throwoperator new UnsupportedOperationException();
}
} |
NOTE: It is up to the operator implementation to decide when (and how often) to emit RuntimeEvent. The overhead of emitting RuntimeEvent is similar to the overhead of emitting Watermark.
3) Add methods in Input and TwoInputStreamOperator to process RecordAttributes.
Code Block |
---|
|
@PublicEvolving
public interface Input<IN> {to optimize throughput at the cost of processing latency.
*/
...
/** @Nullable
*public ProcessesBoolean aisBacklog() {@link RecordAttributes} that arrived on this input.
* This method is guaranteed to not be called concurrently with other methods of the operator.
*/
default void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {}
}...}
} |
2) Make RuntimeEvent PublicEvolving and add a method in Output to broadcast RuntimeEvent to downstream operators.
Code Block |
---|
|
/** Subclasses of this event are recognized as events exchanged by the core runtime. */
@PublicEvolving
public abstract class RuntimeEvent extends AbstractEvent {} |
Code Block |
---|
|
@PublicEvolving
public interface TwoInputStreamOperator<IN1, IN2, OUT> Output<T> extends StreamOperator<OUT>Collector<T> {
...
/**
* ProcessesEmits a {@link RecordAttributesRuntimeEvent} that arrived on the first input of thisfrom an operator.
* This methodelement is guaranteedbroadcast to not be called concurrently with other methods of the operatorall
* downstream operators.
*/
default void processRecordAttributes1emitRuntimeEvent(RecordAttributesRuntimeEvent recordAttributesruntimeEvent) throws Exception {}
throw /**new UnsupportedOperationException();
* Processes a {@link RecordAttributes} that }
} |
NOTE: It is up to the operator implementation to decide when (and how often) to emit RuntimeEvent. The overhead of emitting RuntimeEvent is similar to the overhead of emitting Watermark.
3) Add methods in Input and TwoInputStreamOperator to process RecordAttributes.
Code Block |
---|
|
@PublicEvolving
public interface Input<IN> {
...
/**
* Processes a {@link RecordAttributes} that arrived on this inputarrived on the second input of this operator.
* This method is guaranteed to not be called concurrently with other methods of the operator.
*/
default void processRecordAttributes2processRecordAttributes(RecordAttributes recordAttributes) throws Exception {}
} |
Code Block |
---|
|
@PublicEvolving
public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {
...
/**
* Processes a {@link RecordAttributes} that arrived on the first input of this operator.
* This method is guaranteed to not be called concurrently with other methods of the operator.
*/
default void processRecordAttributes1(RecordAttributes recordAttributes) throws Exception {}
/**
* Processes a {@link RecordAttributes} that arrived on the second input of this operator.
* This method is guaranteed to not be called concurrently with other methods of the operator.
*/
default void processRecordAttributes2(RecordAttributes recordAttributes) throws Exception {}
} |
NOTE: An operator should take care to enforce the processing latency requirement for the NOTE: An operator should take care to enforce the processing latency requirement for the records it has received when isBacklog=false. In particular, the operator should flush/emit buffered records (inside `#processRecordAttributes`) if backlog status of the input switches from false to true, so that we do not trap records that should be processed with low latency.
...
Code Block |
---|
|
@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
...
default OperatorAttributes getOperatorAttributes() {
return new OperatorAttributesBuilder().build();
}
}
@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
...
default OperatorAttributes getOperatorAttributes() {
return new OperatorAttributesBuilder().build();
}
} |
Proposed Changes
1) Update AbstractStreamOperator and AbstractStreamOperatorV2 to override #processRecordAttributes, #processRecordAttributes1 and #processRecordAttributes2
according to the following logic:
...
...
Proposed Changes
1) Update AbstractStreamOperator and AbstractStreamOperatorV2 to override #processRecordAttributes, #processRecordAttributes1 and #processRecordAttributes2.
These methods should use the following code.
Code Block |
---|
|
void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {
return new RecordAttributesBuilder(lastRecordAttributesOfInputs).build();
} |
This effectively allows an operator to buffer records iff any of its upstream source operators claims isProcessingBacklog=true.
...
2) Add the APIs on Transformation interface to get the corresponding operator attributes.
...