Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In essence, the “real-time” nature of streaming computations lead to a correlation between the responsiveness of the system, i.e. how much do we have to wait until we get results for our 1h windowing computation, and the wall-clock/processing time. This correlation does not exist in the batch world where the input dataset is static and known in advance.

TODO: Do we want to do it as proposed below or fire all processing-time timers at the end.

Proposal: Given the above discussion , and the fact that with batch processing completeness, correctness and reproducibility of the result have no fundamental reason to be sacrificed for performance, we argue that processing time can be ignored in batch.

...

  1. When trying to apply processing time windowing on batch workloads, Flink could throw an exception warning the user.
  2. When the user tries to set Processing Time timers, e.g. a ProcessFunction, or specify a processing time-based policy, e.g. a rolling policy in the StreamingFileSink, Flink will ignore these timers when executed in batch.
  3. Custom triggers, both in event and processing time, should be treated as optional and potentially be ignored in batch. The reason is that early triggers are simply a means to bypass the previously mentioned correlation between responsiveness and processing time, which does not exist in batch processing.

Future Work: In the future, we may consider adding as options the capability of:

  • firing all the registered processing time timers at the end of a job (at close()).

We propose to introduce three new configuration options:

  • pipeline.processing-time.allow:
    • ALLOW: allow API calls to get the current processing time and register timers
    • IGNORE: silently ignore calls to register processing-time timers TODO: What to return from getCurrentProcessingTime()
    • FAIL: fail with an exception when API methods that deal with processing time are called
  • pipeline.processing-time.end-of-input:
    • FIRE_AND_QUIESCE: fire any pending processing-time timers at end-of-input but silently ignore attempts to register new timers
    • IGNORE: ignore pending processing-time timers at end-of-input and shut down posthaste
  • pipeline.windowing.triggers.allow:
    • ALLOW: allow and use window Triggers, this is the default DataStream API behaviour so far
    • IGNORE: completely ignore any Triggers that are set via the windowing API

These options refer to BOTH batch and streaming and they will make sure that the same job written for streaming can also run for batch. The defaults for STREAMING and BATCH, respectively, should be:

  • STREAMING
    • pipeline.processing-time.allow: ALLOW
    • pipeline.processing-time.end-of-input: FIRE_AND_QUIESCE
    • pipeline.windowing.triggers.allow: ALLOW
  • BATCH
    • pipeline.processing-time.allow: FAIL
    • pipeline.processing-time.end-of-input: IGNORE
    • pipeline.windowing.triggers.allow: IGNORE

We think that we should fail hard in BATCH execution mode when using processing-time API because silently ignoring those API calls could lead to surprising job results. If we accept this, we need to silently ignore custom Triggers in the Windowing API because using them would lead to failures from them trying to use the processing-time APIs.

The proposed default of pipeline.processing-time.end-of-input: FIRE_AND_QUIESCE in STREAMING mode is a change from the current behaviour but there have been enough users reporting that they were surprised by this behaviour on the mailing list to warrant this change. See also 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-18647
These options refer to BOTH batch and streaming and they will make sure that the same job written for streaming can also run for batch.

Event Time Support in BATCH execution mode

...