Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

2) Add a method in Output to broadcast RuntimeEvent RecordAttributes to downstream operators.

Code Block
languagejava
@PublicEvolving
public interface Output<T> extends Collector<T> {
    ...      

    /**
     * Emits a {@link RecordAttributes} from an operator. This element is broadcast to all
     * downstream operators.
     */  
	@Experimental     
	default void emitRecordAttributes(RecordAttributes recordAttributes) {
        throw new UnsupportedOperationException();
    } 
}

...

NOTE: It is up to the operator implementation to decide when (and how often) to emit RuntimeEventRecordAttributes. The overhead of emitting RuntimeEvent RecordAttributes is similar to the overhead of emitting Watermark.

...

4) Add OperatorAttributesBuilder and OperatorAttributes for operator developers to specify operator attributes that Flink JM can use to properly compile the graph (e.g. whether to add an external sorter to sort input records by key).

...

Note that an operator with an internal sorter does not necessarily mean that it only emits data at the end of input.  For example, we might have an operator that sorted data when it is still reading from an input with isBacklog=true. When all the inputs (it is still reading from) have isBacklog=false, the operator can stop sorting and start to emit records continuously in the streaming fashion. And the operator can support checkpointing during this period.

...

3) Update Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.


4) Update the Transformation translator to use the following operator attributes when compiling the Transformation graph into the JobGraph.

...

  • If a OneInputTransformation has isInternalSorterSupported == false, keyed input, and execution.checkpointing.interval-during-backlog == 0:
    • Flink runtime will add a external sorter for the input to sort the keyed input during backlog processing
    • Its managed memory should be set according to execution.sorted-inputs.memory

...