You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 22 Current »

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


[This FLIP proposal is a joint work between Yunfeng Zhou  and Dong Lin ]


Motivation

In Flink jobs, stream elements would be serialized into bytes for network communication and preservation in buffer and snapshots. For stream records, the serialization format is as follows.

  1. a 1-byte tag containing the type of the stream element. For example, the tag shows whether the element is a StreamRecord, Watermark or LatencyMarker, and whether it has timestamps when it is a StreamRecord.

  2. an optional 8-byte array containing the timestamp of StreamRecord, if any.

  3. an N-byte array containing the value of the StreamRecord, according to the serialization strategy of the type serializer.

In scenarios where the Flink job does not emit data with time attributes and disables watermark and latency tracking, the only meaningful data would be the value of the StreamRecords. The current serialization format, in this scenario, would unnecessarily serializing the tag, affecting the network communication performance and memory consumptions.

In order to improve Flink jobs' performance in the situations described above, we propose to add the following APIs, providing Flink infrastructure with the information whether time attributes of the data would be used during runtime, and optimize the serialization process accordingly.

Public Interfaces

1. Add job-level configuration pipeline.force-timestamp-support

Here are the specifications of this config:

  • Name: pipeline.force-timestamp-support

  • Type: Boolean

  • Default value: true

  • Description: Whether to force the support for record timestamps. If this is false, automatic watermark(pipeline.auto-watermark-interval) and latency tracking(metrics.latency.interval) is disabled, and the job only uses Flink's built-in watermark generators that is guaranteed to never emit watermark, then Flink will optimize the runtime per-record overhead (e.g. serialization) that is otherwise needed to handle timestamp and watermark.

Note that this configuration would be introduced with a @Deprecated annotation. See migration plan section below for reason and details.

Proposed Changes

1. Update StreamElementSerializer to take advantage of the introduced APIs.

When the following conditions are met

  • pipeline.force-timestamp-support is set to false
  • The job does not have watermarks, including
    • pipeline.auto-watermark-interval is set to zero
    • the job is not assigned with a watermark strategy, i.e., the job graph does not contain TimestampsAndWatermarksOperator or the operator's strategy is WatermarkStrategy.noWatermarks().
  • metrics.latency.interval is set to zero

The only subclass of StreamElement that needs to be serialized would be StreamRecord, and the records' timestamps are not needed. Thus the StreamElementSerializer would treat all received StreamElement as StreamRecord and serialize only the value of each StreamRecord. Otherwise it would keep using the existing serialization format described in the motivation section.


2. Throw proper exceptions when the optimization's assumption is broken

Even when the conditions above are met, there are still rare cases in which users manually assign and use timestamps and watermarks in custom operators. In these scenarios, Flink would catch the invalid variables or invocations and throw proper exceptions as follows.

  • When the optimized StreamElementSerializer receives a StreamElement instance that is not a StreamRecord, a ClassCastException would be thrown during serialization, which would be caught and converted into an exception saying that users need to turn off watermark and latency tracking to use this optimization.
  • When an operator tries to get the timestamp of a StreamRecord deserialized by the optimized StreamElementSerializer, exception would be thrown stating that users should configure a proper watermark strategy in order to use timestamp.

Benchmark Result

Passing Boolean value

env.getConfig().setAutoWatermarkInterval(0);
env.getConfig().enableObjectReuse();

DataStream<Boolean> stream = env.fromSequence(0, 1_000_000).map(x -> true);
for (int i = 0; i < 10; i++) {
    stream = stream.map(x -> x).disableChaining();
}
stream.addSink(new DiscardingSink<>());

By running the above Flink benchmark in Flink with a POC implementation of the StreamRecord serialization optimization described above on a local machine, the following result is generated.

  • Before optimization, throughput is 3954.932 ± 351.016 ops/ms.
  • After optimization, throughput is 4737.865 ± 272.070 ops/ms

Thus the proposed change increased the job's throughput by about 20%.

WordCount example

env.getConfig().setAutoWatermarkInterval(0);

DataStream<String> text= env.fromElements(WORDS).name("in-memory-input");

DataStream<Tuple2<String, Integer>> counts =
    text.flatMap(new Tokenizer())
            .disableChaining()
            .name("tokenizer")
            .keyBy(value -> value.f0)
            .sum(1)
            .disableChaining()
            .name("counter");

counts.addSink(new DiscardingSink<>());

env.execute("WordCount");

Above is a benchmark generated from Flink's word count example. Compared with the original example code the following modifications are made.

  • sink is changed from print to discarding sink.
  • chaining is disabled between operators.
  • WORDS is repeated to have 1_000_000 lines.

By running this benchmark without object reuse, the following result is generated.

  • Before optimization, throughput is 362.180 ± 6.711 ops/ms.
  • After optimization, throughput is 373.595 ± 7.041 ops/ms.

By running this benchmark with object reuse, the following result is generated.

  • Before optimization, throughput is 365.720 ± 6.987 ops/ms.
  • After optimization, throughput is 375.033 ± 6.374 ops/ms.

Thus the proposed change increased the job's throughput by about 2% ~3%, regardless of object reuse.

Reference

Below are codes containing the POC code and benchmark code that generated these results.

Note that the latest FLIP's design has changed the naming of part of the public API compared with when the POC code was implemented, so the POC code and the FLIP has some concept gap. But the main optimization still remains the same so the POC code can be used to demonstrate the potential performance improvement this FLIP could bring to Flink.

Compatibility, Deprecation, and Migration Plan

The changes made in this FLIP are backward compatible. No deprecation plan is needed.


Migration plan:

In a major release in future, we plan to remove pipeline.force-timestamp-support, and this optimization would take effect automatically when the job does not have a watermark strategy and latency-tracking is not enabled. This would make the changes backward incompatible with existing versions, but we believe this is acceptable as it is a major version change. For jobs migrated from existing versions, they should make sure to turn on auto watermark or set a watermark strategy if they reply on record timestamps.


Test Plan

The change will be covered with unit and integration tests.





  • No labels