Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Flink provides two Time
classes with org.apache.flink.streaming.api.windowing.time.Time
and org.apache.flink.api.common.time.Time
. The latter one is already considered deprecated through JavaDoc without being formally deprecated with the corresponding @Deprecated
annotation. The intention is to replace it with the former Time
class in Flink’s flink-streaming-java
module.
There was a ML discussion [1] about deprecating both in the end and using Java’s Duration
class, instead. FLINK-14068 [2] was created to cover this effort.
Removing the Time classes would mean touching @Public
and @PublicEvolving
APIs which is the reason why it was brought up as part of the 2.0 feature list [3].
This FLIP’s intention is to formalize the effort.
[1] https://lists.apache.org/thread/76yywnwf3lk8qn4dby0vz7yoqx7f7pkj
[2] https://issues.apache.org/jira/browse/FLINK-14068
[3] https://cwiki.apache.org/confluence/display/FLINK/2.0+Release
Public Interfaces
Comparing feature of both Time implementations with Duration
o.a.f.api.common.time.Time | o.a.f.streaming.api.windowing.time.Time | java.time.Duration | |
module | flink-core | flink-streaming-java | JDK |
Internal state | size: long unit: TimeUnit | size: long unit: TimeUnit | seconds: long nanos : int |
Factory methods | of(long, TimeUnit) milliseconds(long) seconds(long) minutes(long) hours(long) days(long) - | of(long, TimeUnit) milliseconds(long) seconds(long) minutes(long) hours(long) days(long) fromDuration(Duration) | of(long, TemporalUnit) ofMillis(long) ofSeconds(long) ofMinutes(long) ofHours(long) ofDays(long) - |
getter/transform | getUnit() : TimeUnit getSize() : long toMilliseconds() : long toDuration() : Duration | getUnit() : TimeUnit getSize() : long toMilliseconds() : long - | get(temporalUnit) : long - toMillis() : long - |
hashCode()/equals() | Object default | Based on milliseconds long representation | Based on nanosecond representation |
toString() | Object default | “<millis> ms” | ISO-8601 representation [4] |
[4] https://en.wikipedia.org/wiki/ISO_8601#Durations
java.time.Duration
provides the same functionality that is provided by both Time
implementation and can, therefore, be used as a replacement.
Affected Public API
Usage of org.apache.flink.api.common.time.Time
(flink-core)
The Time class is mostly used in the internal implementation. Here more specifically the RpcEndpoints and the REST handlers for handling timeouts.
- Affected @Public API: none
- Affected @PublicEvolving API
RestartStrategies
StateTtlConfig
- [already deprecated]
TableConfig.setIdleStateRetentionTime
- Other classes are the Streaming Examples:
WindowJoin
SideOutputExample
SocketWindowWordCount
GroupedProcessingTimeWindowExample
SessionWindowing
TopSpeedWindowing
All other occurrences are connected to internal timeout handling (where we could, theoretically, also use long):
- RPC-related classes
- REST Handlers
- Generally, JobManager and TaskManager internals
Usage of org.apache.flink.streaming.api.windowing.time.Time
(flink-streaming-java)
- Affected @Public/@PublicEvolving API:
- Several *Stream implementations that allow
allowedLateness
(AllWindowedStream
,CoGroupedStreams
,JoinedStream
,WindowedStream
) or window functionality (DataStream
,KeyedStream
) - Window implementations implement
withGap(Time)
(EventTimeSessionWindows
,ProcessingTimeSessionWindows
) - Factory methods in window implementations (
SlidingEventTimeWindows
,TumblingEventTimeWindows
,TumblingProcessingTimeWindows
,TimeEvictor
,ContinuousEventTimeTrigger
,ContinuousProcessingTimeTrigger
) - Other internal classes:
WindowOperatorBuilder
(flink-streaming-java)CommonExecMatch
(flink-table-planner)NFACompiler
,NFA
,Pattern
(would require touching Scala code to make the Java and Scala API be on par),Quantifier
(flink-cep)PeriodicStreamingJob
,DataStreamAllroundTestJobFactory
(e2e tests)
Scala API
The Scala API is planned to be removed in FLIP-256 [5] and is not considered in this proposal.
One minor Scala change is necessary, though: We need to touch the Scala implementation of the Pattern
class (in flink-cep
). Pattern
requires a new method getWindowDuration
that replaces getWindowTime
to offer a method with Duration
as the return type. Flink ensures API parity between Java and Scala for the Pattern
class through PatternScalaAPICompletenessTest
. We need to extend the Scala implementation of the Pattern
class to make this check succeed in Flink 1.x. An alternative approach would have been to add an exclusion to the PatternScalaAPICompletenessTest
. Adding the method to a deprecated API sounded like the easier option.
[5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support
Proposed Changes
Deprecation Step
Both Time classes will two additional methods:
- Static method toDuration(@Nullable Time) that’s used in cases where it’s not clear whether null was properly handled in the past. A passed null value will result in a null value to be returned.
- Member method toDuration() that’s used in cases where we’re sure that null isn’t allowed.
Method overloading
Method overloading for methods with Time parameter will be applied. The new method will utilize Duration instead of Time. The actual method logic will be moved into the new method. The old method (with Time parameter(s)) will be marked as deprecated. The JavaDoc will be extended by a @deprecated tag pointing to the new method.
Methods with Time return type
We need to come up with a reasonable method name for the new method (returning Duration) that’s as close as possible to the existing one. Otherwise, deprecation will be handled in the same way as in the Method overloading step.
Only the artifacts unrelated to the internal timeout management will be marked as deprecated and affected by the refactoring in the deprecation step.
API Cleanup Step
The Time classes will be removed in 2.0. Any affected code will be migrated to java.time.Duration. Deprecated methods will be removed.
Compatibility, Deprecation, and Migration Plan
The migration plan is covered in the Proposed Changes section.
Test Plan
- Visual verification of affected classes
- CI should succeed
Rejected Alternatives
Using long as an alternative for timeouts
We could switch to long for some of the use cases (e.g. the timeout handling). I don’t see any real advantage of doing this, though. Duration, in contrast, has the advantage of having the time unit encoded in the class itself which improves readability of the code.