Status

Current state: Draft

Discussion thread:

https://lists.apache.org/thread/dgkl0yc05c790wdqhp91x4np77wq3qmb

https://lists.apache.org/thread/wlgw5b9proknwkr2pdzqqdnszd4szm0d

JIRA: Unable to render Jira issues macro, execution error.

Released: TBD

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently Flink jobs will ignore all the pending processing timers on job termination. This might not be desired under some cases and might cause confusions [1][2][3]

To overcome this issue, we need to provide more flexible options for dealing with processing timers on job termination. For example, to ensure the processing timer window operator could emit correct results for bounded streams, it should trigger all the pending timers on end of data.

Therefore, this FLIP proposes to enhance the ability to deal with processing timers on job termination.

Scenarios

Currently the processing timer stack consists of three layers in the task side:

  1. Task-level SystemProcessingTimerService: Each task has a SystemProcessingTimerService that is actually responsible for registering the timers.
  2. Operator-level ProcessingTimerService:The operator-level ProcessingTimerService is a proxy of the SystemProcessingTimerService. Each operator has its own operator-level ProcessingTimerService. Some operators directly use this ProcessingTimerService in some cases, e.g. AsyncWaitOperator, AsyncSync, etc.
  3. InternalTimerService: InternalTimerService is built on top of the operator-level ProcessingTimerService to support a large number of per-keyed timers. Inside InternalTimerService, each timer is bound to a key (thus it could be only used in keyed operators). Besides, it also maintains a priority queue of timers, only the earliest timer is actually registered to the operator-level ProcessingTimerService. This reduces the cost of supporting a lot of keys. At last, the timers in the InternalTimerService will be snapshot on checkpoint and restored on restarting. The user-level timer services, like the ones used in WindowOperator and KeyedProcessOperator, are built on top of the InternalTimerService. 

The current scenarios that rely on the operator level's ProcessingTimerService is listed as follows

OperatorScenarioIs Current Behavior CorrectExpected Action on Termination
AsyncWaitOperatorTimeout / retry timeout for each inflight asynchronous action

Correct. The operator will wait till all the inflight requests finished, thus all the timers should be either triggered or canceled

IGNORE, note the timer could not be canceled

FileWriterTimeout for flushing in-progress fileCorrect. The operator will close the in-progress file on termination. CANCEL or IGNORE
KafkaWriterSynchronize the emit records metrics periodicallySeems not correct. There is no dedicated synchronization on termination, thus the last piece of records are not count

TRIGGER.

Or the Writer could manually synchronize the timers

Kafka / Kinesis Fetcher, StreamSource, TimestampAndWatermarksOpeator, ProcTimeMiniBatchAssignEmit watermarks periodically. Correct, the system will emit a MAX_LONG watermark on terminationCANCEL
Kafka ReducingUpsertWriterPeriodically flush buffered data. Correct, flush called explicitly on terminationCANCEL or IGNORE
Pulsar WriterUpdate the topic metadata. Seems correct, new metadata is not required any moreCANCEL
Sink V2 CommitOperatorRetry committing on failureCorrect, it is explicitly handled on endOfInput to wait till all the commits succeed or fail finallyCANCEL


The current scenarios that rely on the InternalTimerService is listed as follows:

OperatorScenarioIs Current Behavior CorrectExpected Action on Termination
CEP operator

within(), which contains two cases:

  1. There is no notFollowBy(). If the pattern is not fully matched, on timeout it will be notified as partial matched.
  2. There is notFollowBy(), then the pattern could only be notified on timeout. Besides, at then it will check the elapsed time. 


Very thanks Dian Fu for helping verifying the logic here. 

NoTRIGGER, and it requires the currentProcessingTime() points to MAX_LONG for the notFollowBy() to work correctly. 
DataStream Window operatorTumbling Window, Sliding Window and Session WindowNoTRIGGER
SQL Window operator-NoTRIGGER
SQL Interval Join Operator-NoTRIGGER
Customized usage in ProcessFunction 

-

NoDepends on users' logic. 


This FLIP seeks to provide a unified support for all the timers inside the operators. Therefore, we need to consider both the scenarios mentioned in point 2 & 3.  

Public Interfaces

Supported Actions on Termination

Based on the summary of the currently known scenarios, the supported actions for a timer on termination includes:

  1. CANCEL: if the timer has been registered, it will be tried to be canceled, otherwise it will be ignored. 
  2. TRIGGER: the timer will be triggered once immediately no matter if it is registered or not.
  3. WAIT: the task thread will be blocked until the timer gets triggered on the registered timepoint. 


The actions will be defined in an enum:

enum TimerTerminationAction {
    CANCEL,
    TRIGGER,
    WAIT
}


Except for the one-shot timers, the operator-level ProcessingTimerService also supports periodic timers. However, since it is only used rarely and currently it seems all the scenarios satisfy the CANCEL semantics, we'll not extending this part this time. 

APIs to Specify the Termination Action

Operator authors and users are able to specify the actions of the timers with the following ways:

  1. Both operators and UDFs could specify the action when registering timers.
  2. Users could also overwrite the actions for an operator globally when building DAGs. This is useful in cases like users wanting to wait for all the timers instead of triggering them immediately for some specific jobs.
//org.apache.flink.streaming.api.TimerService
@PublicEvolving
public interface TimerService {
    … 

    void registerProcessingTimeTimer(long time, TimeTerminationAction action);
    …

}



// Internal interface, but operator authors may need it
@Internal
public interface InternalTimerService<N> {
    …
    void registerProcessingTimeTimer(N namespace, long time, TimeTerminationAction action);
    …
}

// org.apache.flink.api.common.operators.ProcessingTimeService
@PublicEvolving
public interface ProcessingTimeService {
    …
    ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target, TimeTerminationAction action);
    … 
}


@Public
public class SingleOutputStreamOperator<T> extends DataStream<T> {
    …
    void setTimerTerminationAction(TimerTerminationAction action);
    …
}

Key Implementations

Executes the Actions on Termination

The termination action specifications take effect when the operator receives EndOfDataEvent. Based on the current process, the timers would be dealt with before we call the finish() method of the operators to avoid breaking the semantics that no records are allowed after the finish() method, we keep this order unchanged. 

We’ll first mark the operator-level ProcessingTimerService to be in a special state, e.g. FLUSING. Then we’ll deal with all the registered timers in order. For each timer,

  • CANCEL: Cancel the timer.
  • TRIGGER:Cancel the original timer and execute the callback manually. 
  • WAIT: Yield the task thread until the timer gets triggered. 


Except for the registered timers, there are still some pending timers managed in the InternalTimerService. These timers would be registered if the head timer in the InternalTimerService gets processed. However, if the head timer gets canceled, the following timers would have no chance to be registered. We’ll modify the interface between the operator-level ProcessingTimerService and InternalTimerService to make sure the latter will also get a notification when the head timer gets canceled to register the following timers.

Since except for the InternalTimerService, there are also other scenarios that will register new timers in the callback, we need to ensure we do not fall into an infinite loop. To overcome this issue, the operator-level ProcessingTimerService should be aware of the latest timer to be flushed, and the InternalTimerService would then need to report its latest timer on new timers registered. 

State Compatibility

The pending timers in the InternalTimerService will be snapshot during checkpoint. Since we need to store the action for each timer, it will affect the serialized representation in the snapshot. Fortunately InternalTimerServiceSerializationProxy is versioned, thus we could extend it to support a new version of serialization logic. 

Compatibility, Deprecation, and Migration Plan

The default timer termination actions in some scenarios would be changed from CANCEL to other options, but the behavior after modification should be the right one. 

Timers restored from existing snapshots would be first set with CANCEL. Though it might cause some inconsistency with the newly registered timers, it does not cause a more serious issue (all timers are canceled → parts of timers are canceled), thus we think it should be acceptable. 

Timers registered by users would still be CANCELED before users change their codes explicitly. 

Test Plan

  • Unit tests of each changed component.
  • Integrated tests to verify the actions behaves correctly on job termination. 

Rejected Alternatives

Currently there are no rejected alternatives. 


  • No labels