Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Handle LatencyMarker as RuntimeEvent during serialization

...

In scenarios where the Flink job does not emit data with time attributes and disables watermark and latency tracking, the only meaningful data would in the data flow should be the value of the StreamRecords. The current serialization format, in this scenario, would unnecessarily serializing the tag, affecting the network communication performance and memory consumptions.

...

  • Name: pipeline.force-timestamp-support

  • Type: Boolean

  • Default value: true

  • Description: Whether to force the support for record timestamps. If this is false, automatic watermark(pipeline.auto-watermark-interval) and latency tracking(metrics.latency.interval) is disabled, and the job only uses Flink's built-in watermark generators that is guaranteed to never emit watermark, then Flink will optimize the runtime per-record overhead (e.g. serialization) that is otherwise needed to handle timestamp and watermark.

...

  • pipeline.force-timestamp-support is set to false
  • The job does not have watermarks, including
    • pipeline.auto-watermark-interval is set to zero
    • the job is not assigned with a watermark strategy, i.e., the job graph does not contain TimestampsAndWatermarksOperator or the operator's strategy is WatermarkStrategy.noWatermarks().
  • metrics.latency.interval is set to zero

The Then apart from LatencyMarker, which is treated differently as described in the next section, the only subclass of StreamElement that needs to be serialized would be StreamRecord, and the records' timestamps are not needed. Thus the StreamElementSerializer would treat all received StreamElement as StreamRecord and serialize only the value of each StreamRecord. Otherwise it would keep using the existing serialization format described in the motivation section.


2. Transmit latency marker as RuntimeEvent instead of StreamElement through network

We propose to introduce a RuntimeEvent subclass that contains the same information as the existing LatencyMarker as follows.

Code Block
languagejava
/**
 * A {@link RuntimeEvent} used to approximate the time a record needs to travel through the
 * dataflow.
 */
public class LatencyMarkerEvent extends RuntimeEvent {

    public long getMarkedTime() {
        ...
    }

    public OperatorID getOperatorId() {
        ...
    }

    public int getSubtaskIndex() {
        ...
    }
}

The introduced RuntimeEvent would be used as follows.

  • Before a LatencyMarker is sent out through network, it is converted into a LatencyMarkerEvent.
  • The LatencyMarkerEvent is serialized and transmitted through network.
  • After the downstream task receives a LatencyMarkerEvent, it converts the event back to a LatencyMarker instance and sends it to operators.

With the process described above, latency markers would be serialized by EventSerializer instead of StreamElementSerializer, which guarantees that the only subclass of StreamElement that needs to be serialized would be StreamRecord.

Given that the code path to transmit RuntimeEvent and that to transmit StreamElement is not identical, and that conversions between LatencyMarker and LatencyMarkerEvent are introduced, this change might introduce negligible performance regression. As has been documented that latency tracking mechanism could affect a job's throughput, and that the evaluation result is an approximate end-to-end latency, this regression does not break the existing guarantees and thus is acceptable.


3. Throw proper exceptions when the optimization's assumption is broken

...

  • When the optimized StreamElementSerializer receives a StreamElement instance that is not a StreamRecord, a ClassCastException would be thrown during serialization, which would be caught and converted into an exception saying that users need to turn off watermark and latency tracking to watermark to use this optimization.
  • When an operator tries to get the timestamp of a StreamRecord deserialized by the optimized StreamElementSerializer, exception would be thrown stating that users should configure a proper watermark strategy in order to use timestamp.

...

In a major release in future, we plan to remove pipeline.force-timestamp-support, and this optimization would take effect automatically when the job does not have a watermark strategy and latency-tracking is not enabled. This would make the changes backward incompatible with existing versions, but we believe this is acceptable as it is a major version change. For jobs migrated from existing versions, they should make sure to turn on auto watermark or set a watermark strategy if they reply on record timestamps.

...