Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


[This FLIP proposal is a joint work between Yunfeng Zhou  and Dong Lin ]


Motivation

In Flink jobs, stream elements would be serialized into bytes for network communication and preservation in buffer and snapshots. For stream records, the serialization format is as follows.

  1. a 1-byte tag containing the type of the stream element. For example, the tag shows whether the element is a StreamRecord, Watermark or LatencyMarker, and whether it has timestamps when it is a StreamRecord.

  2. an optional 8-byte array containing the timestamp of StreamRecord, if any.

  3. an N-byte array containing the value of the StreamRecord, according to the serialization strategy of the type serializer.

In scenarios where the Flink job does not emit data with time attributes and disables watermark, the only meaningful data in the data flow should be the value of the StreamRecords. The current serialization format, in this scenario, would unnecessarily serializing the tag, affecting the network communication performance and memory consumptions.

In order to improve Flink jobs' performance in the situations described above, we propose to add the following APIs, providing Flink infrastructure with the information whether time attributes of the data would be used during runtime, and optimize the serialization process accordingly.

Public Interfaces

1. Add job-level configuration pipeline.force-timestamp-support

Here are the specifications of this config:

  • Name: pipeline.force-timestamp-support

  • Type: Boolean

  • Default value: true

  • Description: Whether to force the support for record timestamps. If this is false, automatic watermark(pipeline.auto-watermark-interval) is disabled, and the job only uses Flink's built-in watermark generators that is guaranteed to never emit watermark, then Flink will optimize the runtime per-record overhead (e.g. serialization) that is otherwise needed to handle timestamp and watermark.

Note that this configuration would be introduced with a @Deprecated annotation. See migration plan section below for reason and details.

Proposed Changes



1. Update StreamElementSerializer to take advantage of the introduced APIs.

When the following conditions are met

  • pipeline.force-timestamp-support is set to false
  • The job does not have watermarks, including
    • pipeline.auto-watermark-interval is set to zero
    • the job is not assigned with a watermark strategy, i.e., the job graph does not contain TimestampsAndWatermarksOperator or the operator's strategy is WatermarkStrategy.noWatermarks().

Then apart from LatencyMarker, which is treated differently as described in the next section, the only subclass of StreamElement that needs to be serialized would be StreamRecord, and the records' timestamps are not needed. Thus the StreamElementSerializer would treat all received StreamElement as StreamRecord and serialize only the value of each StreamRecord. Otherwise it would keep using the existing serialization format described in the motivation section.


2. Transmit latency marker as RuntimeEvent instead of StreamElement through network

We propose to introduce a RuntimeEvent subclass that contains the same information as the existing LatencyMarker as follows.

/**
 * A {@link RuntimeEvent} used to approximate the time a record needs to travel through the
 * dataflow.
 */
public class LatencyMarkerEvent extends RuntimeEvent {

    public long getMarkedTime() {
        ...
    }

    public OperatorID getOperatorId() {
        ...
    }

    public int getSubtaskIndex() {
        ...
    }
}

The introduced RuntimeEvent would be used as follows.

  • Before a LatencyMarker is sent out through network, it is converted into a LatencyMarkerEvent.
  • The LatencyMarkerEvent is serialized and transmitted through network.
  • After the downstream task receives a LatencyMarkerEvent, it converts the event back to a LatencyMarker instance and sends it to operators.

With the process described above, latency markers would be serialized by EventSerializer instead of StreamElementSerializer, which guarantees that the only subclass of StreamElement that needs to be serialized would be StreamRecord.

Given that the code path to transmit RuntimeEvent and that to transmit StreamElement is not identical, and that conversions between LatencyMarker and LatencyMarkerEvent are introduced, this change might introduce negligible performance regression. As has been documented that latency tracking mechanism could affect a job's throughput, and that the evaluation result is an approximate end-to-end latency, this regression does not break the existing guarantees and thus is acceptable.


3. Throw proper exceptions when the optimization's assumption is broken

Even when the conditions above are met, there are still rare cases in which users manually assign and use timestamps and watermarks in custom operators. In these scenarios, Flink would catch the invalid variables or invocations and throw proper exceptions as follows.

  • When the optimized StreamElementSerializer receives a StreamElement instance that is not a StreamRecord, a ClassCastException would be thrown during serialization, which would be caught and converted into an exception saying that users need to turn off watermark to use this optimization.
  • When an operator tries to get the timestamp of a StreamRecord deserialized by the optimized StreamElementSerializer, exception would be thrown stating that users should configure a proper watermark strategy in order to use timestamp.


Benchmark Result

Passing Boolean value

env.getConfig().setAutoWatermarkInterval(0);
env.getConfig().enableObjectReuse();

DataStream<Boolean> stream = env.fromSequence(0, 1_000_000).map(x -> true);
for (int i = 0; i < 10; i++) {
    stream = stream.map(x -> x).disableChaining();
}
stream.addSink(new DiscardingSink<>());

By running the above Flink benchmark in Flink with a POC implementation of the StreamRecord serialization optimization described above on a local machine, the following result is generated.

  • Before optimization, throughput is 3954.932 ± 351.016 ops/ms.
  • After optimization, throughput is 4737.865 ± 272.070 ops/ms

Thus the proposed change increased the job's throughput by about 20%.

WordCount example

env.getConfig().setAutoWatermarkInterval(0);

DataStream<String> text= env.fromElements(WORDS).name("in-memory-input");

DataStream<Tuple2<String, Integer>> counts =
    text.flatMap(new Tokenizer())
            .disableChaining()
            .name("tokenizer")
            .keyBy(value -> value.f0)
            .sum(1)
            .disableChaining()
            .name("counter");

counts.addSink(new DiscardingSink<>());

env.execute("WordCount");

Above is a benchmark generated from Flink's word count example. Compared with the original example code the following modifications are made.

  • sink is changed from print to discarding sink.
  • chaining is disabled between operators.
  • WORDS is repeated to have 1_000_000 lines.

By running this benchmark without object reuse, the following result is generated.

  • Before optimization, throughput is 362.180 ± 6.711 ops/ms.
  • After optimization, throughput is 373.595 ± 7.041 ops/ms.

By running this benchmark with object reuse, the following result is generated.

  • Before optimization, throughput is 365.720 ± 6.987 ops/ms.
  • After optimization, throughput is 375.033 ± 6.374 ops/ms.

Thus the proposed change increased the job's throughput by about 2% ~3%, regardless of object reuse.

Reference

Below are codes containing the POC code and benchmark code that generated these results.

Note that the latest FLIP's design has changed the naming of part of the public API compared with when the POC code was implemented, so the POC code and the FLIP has some concept gap. But the main optimization still remains the same so the POC code can be used to demonstrate the potential performance improvement this FLIP could bring to Flink.

Compatibility, Deprecation, and Migration Plan

The changes made in this FLIP are backward compatible. No deprecation plan is needed.


Migration plan:

In a major release in future, we plan to remove pipeline.force-timestamp-support, and this optimization would take effect automatically when the job does not have a watermark strategy. This would make the changes backward incompatible with existing versions, but we believe this is acceptable as it is a major version change. For jobs migrated from existing versions, they should make sure to turn on auto watermark or set a watermark strategy if they reply on record timestamps.


Test Plan

The change will be covered with unit and integration tests.