Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add word count benchmark

...

  • When an operator tries to get the timestamp of a StreamRecord deserialized by the optimized StreamElementSerializer, exception would be thrown stating that users should configure a proper watermark strategy in order to use timestamp.

Benchmark Result

Passing Boolean value

Code Block
languagejava
env.getConfig().setAutoWatermarkInterval(0);
env.getConfig().enableObjectReuse();

DataStream<Boolean> stream = env.fromSequence(0, 1_000_000).map(x -> true);
for (int i = 0; i < 10; i++) {
    stream = stream.map(x -> x).disableChaining();
}
stream.addSink(new DiscardingSink<>());

...

Thus the proposed change increased the job's throughput by about 20%.

WordCount example

Code Block
env.getConfig().setAutoWatermarkInterval(0);

DataStream<String> text= env.fromElements(WORDS).name("in-memory-input");

DataStream<Tuple2<String, Integer>> counts =
    text.flatMap(new Tokenizer())
            .disableChaining()
            .name("tokenizer")
            .keyBy(value -> value.f0)
            .sum(1)
            .disableChaining()
            .name("counter");

counts.addSink(new DiscardingSink<>());

env.execute("WordCount");

Above is a benchmark generated from Flink's word count example. Compared with the original example code the following modifications are made.

  • sink is changed from print to discarding sink.
  • chaining is disabled between operators.
  • WORDS is repeated to have 1_000_000 lines.

By running this benchmark, the following result is generated.

  • Before optimization, throughput is 344.393 ± 10.615 ops/ms.
  • After optimization, throughput is 351.022 ±  8.065 ops/ms.

Thus the proposed change increased the job's throughput by about 2%.

Reference

Below are codes containing the POC code and benchmark code that generated these results.

...