Reason

Decided to not work on that, at least at the moment.

Status

Current stateAbandoned

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-9-Trigger-DSL-td13065.html

JIRA Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As more people are using Apache Flink in production, more sophisticated use-cases appear and many of them ask for customizing the offered triggering policies.

This document builds on these discussions and tries to come up with a Trigger DSL that will be offered out-of-the-box in the upcoming releases and will be able to cover most of the common use-cases.

 

Background

The triggers currently offered by Apache Flink are:

  • CountTrigger(threshold)(non-purging): Fires after the number of elements in a window passes the given threshold.

  • EventTimeTrigger()(purging): Fire after the watermark passes the end of a window.

  • ProcessingTimeTrigger() (purging): Fire after processing time passes the end of a window.

  • ContinuousEventTimeTrigger(interval)(non-purging): Fire repeatedly with a given periodicity according to the watermark.

  • ContinuousProcessingTimeTrigger(interval)(non-purging): Fire repeatedly with a given periodicity according to processing time.

  • DeltaTrigger(delta-function)(non-purging): Fire once the delta between a start-value and the newly added value exceeds a given threshold.

  • PurgingTrigger(trigger)(purging): Turns any trigger into a purging trigger.

Limitations of the current triggers:

  1. There is no out-of-the-box way to combine two triggers. As an example, the user cannot specify a trigger that fires “after the watermark passes the end of the window and only if the number of elements is at least 5”. This could be useful for cases where data are anonymized using K-anonymity strategies.
  2. In a previous document we described the notion of “allowed lateness”. Allowed lateness is a setting of the WindowOperator and a window for which maxTimestamp + allowedLateness <= currentWatermark is considered late. Lateness affects two things: 1) elements that belong to late windows are dropped, and 2) the state of a window is dropped as soon as the watermark passes the end of the window plus the allowed lateness (garbage collection). Although we allow elements to be late, the current triggers do not provide any possibility to customize the way a pipeline fires for late but not dropped elements. These firings are called “late firings”. In addition, there is no way to specify “early firings”i.e. firings that happen before the watermark (or the processing time) passes the end of the window. To illustrate the above, the user cannot specify a policy that says: “fire in daily windows with allowed lateness of 12 hours, but give me early results every hour, and after the watermark passes the end of the window, fire every 10 elements”.


Proposed Changes


A summary of the triggers we propose to offer out-of-the-box are:

  • CountTrigger.atLeast(threshold) 
  • All(trigger1, trigger2, …)
  • Any(trigger1, trigger2, …)
  • EventTimeTrigger.afterEndOfWindow()
    • withEarlyTrigger(earlyFiringTrigger)
    • withLateTrigger(lateFiringTrigger)
  • EventTimeTrigger.afterFirstElement(delay)
  • ProcessingTimeTrigger.afterEndOfWindow()
    • withEarlyTrigger(earlyFiringTrigger)
  • ProcessingTimeTrigger.afterFirstElement(delay)

In addition, we propose all of these triggers to be able to operate in two modes:

  • discarding()
  • accumulating() 

NOTE: Users can still implement their own triggers by extending the existing Trigger abstract class. 

General Notes

Before describing each of the proposed triggers individually, here we note that:

  1. All new triggers are by default non-purging.

  2. All new triggers are operating on the accumulating mode.

  3. The mode (discarding or accumulating) of the parent trigger overwrites that of its children. This means that in the case of

    All(CountTrigger.atLeast(4).accumulating(), CountTrigger.atLeast(5)).discarding()

The final result will operate in the discarding mode.

CountTrigger.atLeast():
 

As the name implies, the CountTrigger.atLeast(threshold) fires if there are at least threshold elements in the window.

All(), Any():

These two triggers target the first limitation mentioned above, i.e. forming composite triggers. Both of them take as argument a list of triggers of variable length and the Any(trigger1, trigger2,…) fires if any of the child triggers fires, while the All(trigger1, trigger2,…) fires once all of the child triggers fire. With these triggers, the user can specify a trigger that fires when the watermark passes the end of the window and only if there are at least 5 elements by writing:

  •  All(EventTimeTrigger.afterEndOfWindow(), CountTrigger.atLeast(5))

EventTime.*, ProcessingTime.*:

These target the second limitation, i.e. early and late firings. The *.afterEndOfWindow() will fire when the watermark (or the processing time) passes the end of the window, while the .*afterFirstElement(delay) fires after the watermark (or the processing time) passes the timestamp of the first element since the last firing of a window, plus the specified “delay”. The latter triggering policy becomes more interesting when specifying early and late firings, where a window may have multiple such firings. Focusing on the *.afterEndOfWindow(), these triggers allow the specification of an early trigger, and, in the case of the EventTimeTrigger, the specification of a late one. Early firings correspond to firings that happen before the watermark passes the end of the window, while late ones happen after the watermark has passed and the end of the window, but before it passes the end of window plus the allowed lateness. In processing time, there is no late trigger as the notion of lateness does not exist. For more, see here.


An early or a late trigger can be any trigger, or a combination of triggers. So the user can specify an event time trigger with early firings every 10s (whenever there is a new element) in processing time and late firings when there are more than 5 new elements, in the following way:

  • EventTimeTrigger.afterEndOfWindow().withEarlyTrigger(ProcessingTimeTrigger.afterFirstElement(Time.seconds(10)).withLateTrigger(CountTrigger.atLeast(5))
NOTE: There are no early firings if no early trigger is specified. On the contrary, if no late trigger is specified and allowed_lateness > 0, then we fire on every late but not dropped element.

Discarding/Accumulating

These modes indicate if the window operator should clear the state of the underlying window after firing or not. Discarding implies that a trigger purges the state of the underlying window after every firing, while accumulating means that the state is kept for the next firing. By default all triggers operate in the accumulating mode.

The user should be able to specify the mode of a trigger in the following way:

  • EventTimeTrigger.afterEndOfWindow()
    .withLateTrigger(CountTrigger.atLeast(5))
    .discarding();

Or,

  • EventTimeTrigger.afterEndOfWindow()
    .withLateTrigger(CountTrigger.atLeast(5))
    .accumulating();

Compatibility, Deprecation, and Migration Plan

Existing users should not be affected by the proposed changes. All existing triggers and custom triggers will continue work without any problems.