Status
Current state: "Under Discussion"
...
Released: Flink 1.4 or 1.5
Motivation
The default behavior of the streaming runtime is to copy every element between chained operators.
...
- Object copies are extremely costly. While some simple copy virtually for free (types reliably detected as immutable are not copied at all), many real pipelines use types like Avro, Thrift, JSON, etc, which are very expensive to copy.
- Keyed operations currently only occur after shuffles. The operations are hence the first in a pipeline and will never have a reused object anyways. That means for the most critical operation, this precaution is unnecessary.
- The mode is inconsistent with the contract of the
DataSet
API, which does not copy at each step - To prevent these copies, users can select
enableObjectReuse()
, which is misleading, since it does not really reuse mutable objects, but avoids additional copies.
Public Interfaces
Interfaces changed
The interface of the ExecutionConfig
add the method setObjectReuseMode(ObjectReuseMode)
, and deprecates the methods enableObjectReuse()
and disableObjectReuse()
.
...
The default object passing behavior changes, meaning that it can affect the correctness of prior DataStream
programs that assume the original “COPY_PER_OPERATOR” behavior (see below).
Proposed Changes
Summary
I propose to change the default behavior of the DataStream
runtime to be the same as the DataSet
runtime. That means that new objects are chosen on every deserialization, and no copies are made as the objects are passed on along the pipelines.
...
COPY_PER_OPERATOR
FULL_REUSE
Compatibility, Deprecation, and Migration Plan
Interfaces
No interface migration path is needed, because the interfaces are not broken, merely some methods get deprecated.
...
When users set the mode, always that mode is used.
When the mode is not explicitly set, we follow that strategy:
Change the CLI such that we know when users upgrade existing jobs (the savepoint to start from has a version prior to 1.4).
Use DEFAULT as the default for jobs that do not start from savepoint, or that start from savepoint >= 1.4
Use COPY_PER_OPERATOR as the default for upgraded jobs
Rejected Alternatives
None so far...