Discussion threadhttps://lists.apache.org/thread/48ysrg1rrtl8s1twg9wmx35l201hnc2w
Vote threadhttps://lists.apache.org/thread/notp8cc942jknpqcoq1cnsmgh5wz80h5
Result threadhttps://lists.apache.org/thread/0pfl2qjrg77wrpz4sgzyr421fs5423zt
JIRA

Unable to render Jira issues macro, execution error. (pre-2.0 work)

Unable to render Jira issues macro, execution error. (2.0 work)

Release
  • 1.19 
  • 2.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Flink provides two Time  classes with org.apache.flink.streaming.api.windowing.time.Time and org.apache.flink.api.common.time.Time. The latter one is already considered deprecated through JavaDoc without being formally deprecated with the corresponding @Deprecated annotation. The intention is to replace it with the former Time  class in Flink’s flink-streaming-java  module.

There was a ML discussion [1] about deprecating both in the end and using Java’s Duration class, instead. FLINK-14068 [2] was created to cover this effort.

Removing the Time classes would mean touching @Public and @PublicEvolving APIs which is the reason why it was brought up as part of the 2.0 feature list [3].

This FLIP’s intention is to formalize the effort.

[1] https://lists.apache.org/thread/76yywnwf3lk8qn4dby0vz7yoqx7f7pkj
[2] https://issues.apache.org/jira/browse/FLINK-14068
[3] https://cwiki.apache.org/confluence/display/FLINK/2.0+Release

Public Interfaces

Comparing feature of both Time implementations with Duration


o.a.f.api.common.time.Time

o.a.f.streaming.api.windowing.time.Time

java.time.Duration

module

flink-core

flink-streaming-java

JDK

Internal state

size: long

unit: TimeUnit

size: long

unit: TimeUnit

seconds: long

nanos : int

Factory methods

of(long, TimeUnit)

milliseconds(long)

seconds(long)

minutes(long)

hours(long)

days(long)

-

of(long, TimeUnit)

milliseconds(long)

seconds(long)

minutes(long)

hours(long)

days(long)

fromDuration(Duration)

of(long, TemporalUnit)

ofMillis(long)

ofSeconds(long)

ofMinutes(long)

ofHours(long)

ofDays(long)

-

getter/transform

getUnit() : TimeUnit

getSize() : long

toMilliseconds() : long

toDuration() : Duration

getUnit() : TimeUnit

getSize() : long

toMilliseconds() : long

-

get(temporalUnit) : long

-

toMillis() : long

-

hashCode()/equals()

Object default

Based on milliseconds long representation

Based on nanosecond representation

toString()

Object default

“<millis> ms”

ISO-8601 representation [4]

[4] https://en.wikipedia.org/wiki/ISO_8601#Durations

java.time.Duration provides the same functionality that is provided by both Time implementation and can, therefore, be used as a replacement.

Affected Public API

The Time class is mostly used in the internal implementation. Here more specifically the RpcEndpoints and the REST handlers for handling timeouts.

  • Affected @Public API: none
  • Affected @PublicEvolving API
    • RestartStrategies
    • StateTtlConfig
    • [already deprecated] TableConfig.setIdleStateRetentionTime
  • Other classes are the Streaming Examples:
    • WindowJoin
    • SideOutputExample
    • SocketWindowWordCount
    • GroupedProcessingTimeWindowExample
    • SessionWindowing
    • TopSpeedWindowing

All other occurrences are connected to internal timeout handling (where we could, theoretically, also use long):

  • RPC-related classes
  • REST Handlers
  • Generally, JobManager and TaskManager internals
  • Affected @Public/@PublicEvolving API:
    • Several *Stream implementations that allow allowedLateness (AllWindowedStream, CoGroupedStreams, JoinedStream, WindowedStream) or window functionality (DataStream, KeyedStream)
    • Window implementations implement withGap(Time) (EventTimeSessionWindows, ProcessingTimeSessionWindows)
    • Factory methods in window implementations (SlidingEventTimeWindows, TumblingEventTimeWindows, TumblingProcessingTimeWindows, TimeEvictor, ContinuousEventTimeTrigger, ContinuousProcessingTimeTrigger)
  • Other internal classes:
    • WindowOperatorBuilder (flink-streaming-java)
    • CommonExecMatch  (flink-table-planner)
    • NFACompiler , NFA , Pattern  (would require touching Scala code to make the Java and Scala API be on par), Quantifier  (flink-cep)
    • PeriodicStreamingJob , DataStreamAllroundTestJobFactory  (e2e tests)

Scala API

The Scala API is planned to be removed in FLIP-256 [5] and is not considered in this proposal.

One minor Scala change is necessary, though: We need to touch the Scala implementation of the Pattern  class (in flink-cep). Pattern  requires a new method getWindowDuration that replaces getWindowTime to offer a method with Duration as the return type. Flink ensures API parity between Java and Scala for the Pattern class through PatternScalaAPICompletenessTest . We need to extend the Scala implementation of the Pattern  class to make this check succeed in Flink 1.x. An alternative approach would have been to add an exclusion to the PatternScalaAPICompletenessTest . Adding the method to a deprecated API sounded like the easier option.

[5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support

Proposed Changes

Deprecation Step

Both Time classes will two additional methods:

  • Static method toDuration(@Nullable Time) that’s used in cases where it’s not clear whether null was properly handled in the past. A passed null value will result in a null value to be returned.
  • Member method toDuration() that’s used in cases where we’re sure that null isn’t allowed.

Method overloading

Method overloading for methods with Time parameter will be applied. The new method will utilize Duration instead of Time. The actual method logic will be moved into the new method. The old method (with Time parameter(s)) will be marked as deprecated. The JavaDoc will be extended by a @deprecated tag pointing to the new method.

Methods with Time return type

We need to come up with a reasonable method name for the new method (returning Duration) that’s as close as possible to the existing one. Otherwise, deprecation will be handled in the same way as in the Method overloading step.

Only the artifacts unrelated to the internal timeout management  will be marked as deprecated and affected by the refactoring in the deprecation step.

API Cleanup Step

The Time classes will be removed in 2.0. Any affected code will be migrated to java.time.Duration. Deprecated methods will be removed.

Compatibility, Deprecation, and Migration Plan

The migration plan is covered in the Proposed Changes section.

Test Plan

  • Visual verification of affected classes
  • CI should succeed

Rejected Alternatives

Using long as an alternative for timeouts

We could switch to long for some of the use cases (e.g. the timeout handling). I don’t see any real advantage of doing this, though. Duration, in contrast, has the advantage of having the time unit encoded in the class itself which improves readability of the code.