Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.flink.streaming.api.operators;
 
/** The builder class for {@link OperatorAttributes}. */
@PublicEvolving@Experimental
public class OperatorAttributesBuilder {
    @Nullable private Boolean outputOnEOF = null;
    @Nullable private Boolean outputOnCheckpoint = null;
 
    public OperatorAttributesBuilder() {...}
 
    public OperatorAttributesBuilder setOutputOnEOF(boolean outputOnEOF) {...}
 
    public OperatorAttributesBuilder setOutputOnCheckpoint(boolean outputOnCheckpoint) {...}
  
    /**
     * If any operator attribute is null, we will log it at DEBUG level and use the following
     * default values.
     * - outputOnEOF defaults to false
     * - outputOnCheckpoint defaults to false
     */
     public OperatorAttributes build() {...}
}

...

Code Block
languagejava
package org.apache.flink.streaming.api.operators;
 
/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize the job performance.
 */
@PublicEvolving@Experimental
public class OperatorAttributes {    
   /**
     * Returns true iff the operator can only emit records after inputs have reached EOF.
     *
     * <p>Here are the implications when it is true:
     *
     * <ul>
     *   <li> The results of this operator as well as its upstream operators have blocking partition type.
     *   <li> This operator as well as its upstream operators will be executed in batch mode.
     * </ul>
     */
    public boolean isOutputOnEOF() {...}
 
    /**
     * Returns true iff the operator can only emit records when checkpoint is triggered.
     */
    public boolean isOutputOnCheckpoint() {...}
}

...

3) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.

Code Block
languagejava
@PublicEvolving@Experimental
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...
 
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}
 
@PublicEvolving@Experimental
public interface StreamOperatorFactory<OUT> extends Serializable {
    ...
 
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

...

In addition, after FLIP-327 is accepted, this operator should also have isInternalSorterSupported = true. This operator will sort the input before aggregate, and avoid invoking window actions, which is similar to '5)'.

Benchmark results

To demonstrate our optimization improvements,we run each benchmark in different execution modes and configurations 5 times. Here are benchmark results which include average throughput (records/ms), throughput range, and the magnification factor for each streaming job before our work.

Environment Configuration

We run benchmarks on a MacBook with the latest Flink 1.18-snapshot and parallelism=1. Default standalone cluster configuration is used except:


Code Block
languageyml
jobmanager.memory.process.size: 6400m
taskmanager.memory.process.size: 6912m

Benchmark: CoGroup

Use DataStream#coGroup to process records from two bounded streams with Data Count number of records and emit results after both inputs have ended. With this PR, when DataStream#coGroup is invoked with EndOfStreamWindows as the window assigner, EOFCoGroupOperator will be applied, in which two inputs will be sorted first and then output the coGroup results.

Code Block
languagejava
data1.coGroup(data2)
    .where(tuple -> tuple.f0)
    .equalTo(tuple -> tuple.f0)
    .window(EndOfStreamWindows.get())
    .apply(new CustomCoGroupFunction())
    .addSink(...);


In this benchmark, we run each job in CoGroupDataStream 5 times in 4 kinds of configuration: streaming mode, batch mode, optimized streaming mode after this PR and optimized streaming mode with hybrid shuffle after this PR. Here are the throughput benchmark results:

Data CountSTREAMINGBATCHOptimized STREAMINGWith hybrid shuffle
2e661 ± 1 (100%)502 ± 75 (823%)868 ± 74 (1423%)899 ± 82 (1474%)
5e660 ± 1 (100%)528 ± 28 (880%)1176 ± 39 (1960%)1262 ± 50 (2103%)
2e756 ± 0 (100%)564 ± 34 (1007%)1492 ± 28 (2664%)1692 ± 14 (3021%)
5e753 ± 1 (100%)602 ± 16 (1135%)1508 ± 18 (2845%)1712 ± 77 (3230%)

This shows with the changes proposed above, DataStream#coGroup can be 14~30X faster than its current throughput in stream mode, and up to 2.5X faster than its current throughput in batch mode. Using hybrid shuffle in the outputEOF part can further improve the performance.

Benchmark: Aggregate

Use DataStream#aggregate to process records from a bounded streams which has Data Count number of records, and every 100 records have the same key. Emit results after input have ended. With this PR, when DataStream#coGroup is invoked with EndOfStreamWindows as the window assigner, EOFAggregationOperator will be applied, in which the input will be sorted first, then aggregate in each key and output results.

Code Block
languagejava
data
      .keyBy(value -> value.f0)
      .window(EndOfStreamWindows.get())
      .aggregate(new Aggregator())
      .addSink(new CountingAndDiscardingSink());


In this benchmark, we run each job in AggregationBenchmark 5 times in 4 kinds of configuration: streaming mode, batch mode, optimized streaming mode after this PR and optimized streaming mode with hybrid shuffle after this PR. Here are the throughput benchmark results:

Data CountSTREAMINGBATCHOptimized STREAMINGWith hybrid shuffle
5e6201 ± 3 (100%)1454 ± 153 (723%)1514 ± 43 (753%)1670 ± 29 (831%)
1e7207 ± 2 (100%)1522 ± 71 (735%)1620 ± 44 (782%)1866 ± 187 (901%)
2e7201 ± 3 (100%)1607 ± 9 (800%)1746 ± 31 (867%)2038 ± 155 (1014%)
5e7176 ± 1 (100%)1559 ± 6 (885%)1723 ± 22 (978%)1956 ± 56 (1111%)

This shows with the changes proposed above, DataStream#aggregate can be 7~10X faster than its current throughput in stream mode, and slightly faster to the current throughput in batch mode. Using hybrid shuffle in the outputEOF part can further improve the performance.

Benchmark: OperatorDelayedDeploy

We have a job that pre-processes records from a bounded source (Source1) using an operator (Process1) which only emits results after its input has ended. Then anther operator(Process2) needs to process records emitted by Process1 with records from an unbounded source, and emit results with low processing latency in real-time.

Code Block
languagejava
 source1.keyBy(value -> value.f0)
  .window(EndOfStreamWindows.get())
  .aggregate(new MyAggregator()).name("Process1")
  .connect(source2.keyBy(value -> value.f0))
  .transform("Process2", Types.INT, new MyProcessOperator())
  .addSink(new DiscardingSink<>()); 

Resource Utilization

Suppose we set an extra standalone cluster configuration:

Code Block
languageyml
taskmanager.numberOfTaskSlots: 2

When we set the Source1Process1, Source2 and Process2 in 4 different SlotSharingGroups as above and then run this job, we could see that, before this PR, all of the tasks couldn't deploy and blocked in CREATED state because only 2 slots provided in TaskManager. With our work, Source2 and Process2 can be deplyed after Source1Process1 finished and released their slots.

OperatorDelayedDeployDataStream

Efficiency Improvement

Without setting the slot sharing group, we run OperatorDelayedDeployDataStreamInOneSlot each benchmark 5 times. Here are the throughput benchmark results:

Data CountSTREAMINGOptimized STREAMINGWith hybrid shuffle
5e6205 ± 4 (100%)1201 ± 59 (586%)1381 ± 14 (674%)
1e7186 ± 19 (100%)1270 ± 64 (683%)1423 ± 41 (765%)
5e7OOM1463 ± 71666 ± 35

This shows that, as the records number grows, job in streaming mode causes OOM exception while it works well with the changes proposed above.

Compatibility, Deprecation, and Migration Plan

...