You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Current »

Status

Current state: "Under Discussion"

Discussion threadhttps://lists.apache.org/thread.html/d1cd94a97eaba49cb05d8891b133374cd59021d6c5f04e9a19f0a6df@%3Cdev.flink.apache.org%3E

JIRA Unable to render Jira issues macro, execution error.

Released: Flink 1.4 or 1.5

 

Motivation

The default behavior of the streaming runtime is to copy every element between chained operators.

That operation was introduced for “safety” reasons, to avoid the cases where users can create incorrect programs by reusing mutable objects (a discouraged pattern, but possible). For example when using state backends that keep the state as objects on heap, reusing mutable objects can theoretically create cases where the same object is used in multiple state mappings.

The effect of that "safety" mechanism is that many people that try or use Flink get much lower performance than they could possibly get. From empirical evidence, almost all users that I (Stephan) have been in touch with eventually run into this issue.


There are multiple observations about that design:

  •  Object copies are extremely costly. While some simple types copy virtually for free (types that are detected as immutable are not copied at all), many real pipelines use types like Avro, Thrift, JSON, etc, which are very expensive to copy.
  • Keyed operations currently only occur after shuffles. The operations are hence the first in a chain and will never have a reused object anyways. That means for the most critical operation, this precaution is unnecessary.
  • The mode is inconsistent with the contract of the DataSet API, which does not copy at each step
  • To prevent these copies, users can select enableObjectReuse(), which is misleading, since it does not really reuse mutable objects, but avoids additional copies.

Public Interfaces

Interfaces changed

The interface of the ExecutionConfig add the method setObjectReuseMode(ObjectReuseMode), and deprecates the methods enableObjectReuse() and disableObjectReuse().


Behavior changed

The default object passing behavior changes, meaning that it can affect the correctness of prior DataStream programs that assume the original “COPY_PER_OPERATOR” behavior (see below).


Proposed Changes

Summary

I propose to change the default behavior of the DataStream runtime to be the same as the DataSet runtime. That means that new objects are chosen on every deserialization, and no copies are made as the objects are passed on along the pipelines.


Details

I propose to drop the execution config flag objectReuse and instead introduce an ObjectReuseMode enumeration with better control of what should happen. There will be three different types:

  • DEFAULT

    • This is the default in the DataSet API

    • This will become the default in the DataStream API

    • This happens in the DataStream API when enableObjectReuse() is activated.


  • COPY_PER_OPERATOR

    • The current default in the DataStream API


  • FULL_REUSE

    • This happens in the DataSet API when {{enableObjectReuse()}} is chosen.


An illustration of the modes is as follows:


DEFAULT

regular_mode.png


COPY_PER_OPERATOR

copy_mode.png


FULL_REUSE

reuse_mode.png


 

Compatibility, Deprecation, and Migration Plan

Interfaces

No interface migration path is needed, because the interfaces are not broken, merely some methods get deprecated.


Behavior change

We have two


Variant 1:

  • Change the behavior, make it explicit on the release notes that we did that and what cases are affected,

  • This may actually be feasible, because the cases that are affected are quite pathological corner cases that only very bad implementations should encounter (see below)

Variant 2:

  • When users set the mode, always that mode is used.

  • When the mode is not explicitly set, we follow that strategy:

    • Change the CLI such that we know when users upgrade existing jobs (the savepoint to start from has a version prior to 1.4).

    • Use DEFAULT as the default for jobs that do not start from savepoint, or that start from savepoint >= 1.4

    • Use COPY_PER_OPERATOR as the default for upgraded jobs

 

Rejected Alternatives

None so far...

  • No labels