Discussion thread | https://lists.apache.org/thread/6khpqqb4bydkqyxcv8dpo6smts5ojdqc |
---|---|
Vote thread | - |
JIRA | - |
Release | - |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
[This FLIP proposal is a joint work between Yunfeng Zhou and Dong Lin ]
Motivation
In Flink jobs, stream elements would be serialized into bytes for network communication and preservation in buffer and snapshots. For stream records, the serialization format is as follows.
a 1-byte tag containing the type of the stream element. For example, the tag shows whether the element is a StreamRecord, Watermark or LatencyMarker, and whether it has timestamps when it is a StreamRecord.
an optional 8-byte array containing the timestamp of StreamRecord, if any.
In scenarios where the Flink job does not emit data with time attributes and disables watermark and latency tracking, the only meaningful data would be the value of the StreamRecords. The current serialization format, in this scenario, would unnecessarily serializing the tag, affecting the network communication performance and memory consumptions.
In order to improve Flink jobs' performance in the situations described above, we propose to add the following APIs, providing Flink infrastructure with the information whether time attributes of the data would be used during runtime, and optimize the serialization process accordingly.
Public Interfaces
Here are the specifications of this config:
Name: pipeline.force-timestamp-support
Type: Boolean
Default value: true
Description: Whether to force the support for record timestamps. If this is false, automatic watermark(pipeline.auto-watermark-interval) and latency tracking(metrics.latency.interval) is disabled, and the job only uses Flink's built-in watermark generators that is guaranteed to never emit watermark, then Flink will optimize the runtime per-record overhead (e.g. serialization) that is otherwise needed to handle timestamp and watermark.
Note that this configuration would be introduced with a @Deprecated
annotation. See migration plan section below for reason and details.
Proposed Changes
1. Update StreamElementSerializer
to take advantage of the introduced APIs.
When the following conditions are met
pipeline.force-timestamp-support
is set to false- The job does not have watermarks, including
- pipeline.auto-watermark-interval is set to zero
- the job is not assigned with a watermark strategy, i.e., the job graph does not contain TimestampsAndWatermarksOperator or the operator's strategy is WatermarkStrategy.noWatermarks().
- metrics.latency.interval is set to zero
The only subclass of StreamElement that needs to be serialized would be StreamRecord, and the records' timestamps are not needed. Thus the StreamElementSerializer would treat all received StreamElement as StreamRecord and serialize only the value of each StreamRecord. Otherwise it would keep using the existing serialization format described in the motivation section.
2. Throw proper exceptions when the optimization's assumption is broken
Even when the conditions above are met, there are still rare cases in which users manually assign and use timestamps and watermarks in custom operators. In these scenarios, Flink would catch the invalid variables or invocations and throw proper exceptions as follows.
- When the optimized StreamElementSerializer receives a StreamElement instance that is not a StreamRecord, a ClassCastException would be thrown during serialization, which would be caught and converted into an exception saying that users need to turn off watermark and latency tracking to use this optimization.
- When an operator tries to get the timestamp of a StreamRecord deserialized by the optimized StreamElementSerializer, exception would be thrown stating that users should configure a proper watermark strategy in order to use timestamp.
Benchmark Result
env.getConfig().setAutoWatermarkInterval(0); env.getConfig().enableObjectReuse(); DataStream<Boolean> stream = env.fromSequence(0, 1_000_000).map(x -> true); for (int i = 0; i < 10; i++) { stream = stream.map(x -> x).disableChaining(); } stream.addSink(new DiscardingSink<>());
By running the above Flink benchmark in Flink with a POC implementation of the StreamRecord serialization optimization described above on a local machine, the following result is generated.
- Before optimization, throughput is 3954.932 ± 351.016 ops/ms.
- After optimization, throughput is 4737.865 ± 272.070 ops/ms
Thus the proposed change increased the job's throughput by about 20%.
Below are codes containing the POC code and benchmark code that generated these results.
- POC code: https://github.com/yunfengzhou-hub/flink/tree/optimize-record-serialize-2
- Benchmark code: https://github.com/yunfengzhou-hub/flink-benchmarks/tree/optimize-record-serialize
Compatibility, Deprecation, and Migration Plan
The changes made in this FLIP are backward compatible. No deprecation plan is needed.
Migration plan:
In a major release in future, we plan to remove pipeline.force-timestamp-support, and this optimization would take effect automatically when the job does not have a watermark strategy and latency-tracking is not enabled. This would make the changes backward incompatible with existing versions, but we believe this is acceptable as it is a major version change. For jobs migrated from existing versions, they should make sure to turn on auto watermark or set a watermark strategy if they reply on record timestamps.