THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
- When an operator tries to get the timestamp of a StreamRecord deserialized by the optimized StreamElementSerializer, exception would be thrown stating that users should configure a proper watermark strategy in order to use timestamp.
Benchmark Result
Passing Boolean value
Code Block | ||
---|---|---|
| ||
env.getConfig().setAutoWatermarkInterval(0); env.getConfig().enableObjectReuse(); DataStream<Boolean> stream = env.fromSequence(0, 1_000_000).map(x -> true); for (int i = 0; i < 10; i++) { stream = stream.map(x -> x).disableChaining(); } stream.addSink(new DiscardingSink<>()); |
...
Thus the proposed change increased the job's throughput by about 20%.
WordCount example
Code Block |
---|
env.getConfig().setAutoWatermarkInterval(0);
DataStream<String> text= env.fromElements(WORDS).name("in-memory-input");
DataStream<Tuple2<String, Integer>> counts =
text.flatMap(new Tokenizer())
.disableChaining()
.name("tokenizer")
.keyBy(value -> value.f0)
.sum(1)
.disableChaining()
.name("counter");
counts.addSink(new DiscardingSink<>());
env.execute("WordCount"); |
Above is a benchmark generated from Flink's word count example. Compared with the original example code the following modifications are made.
- sink is changed from print to discarding sink.
- chaining is disabled between operators.
- WORDS is repeated to have 1_000_000 lines.
By running this benchmark, the following result is generated.
- Before optimization, throughput is 344.393 ± 10.615 ops/ms.
- After optimization, throughput is 351.022 ± 8.065 ops/ms.
Thus the proposed change increased the job's throughput by about 2%.
Reference
Below are codes containing the POC code and benchmark code that generated these results.
...