Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

Vote threadhttps://

...

...

thread.html/r6ffdbe90e98542f61e610e5f5abdba0f7a5acb5f98cd01baa1fc17d0%40%3Cdev.flink.apache.org%3E
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-19316

Release1.12


Table of Contents
excludeStatus

Motivation

As described in FLIP-

Table of Contents
excludeStatus

Motivation

As described in FLIP-131, we are aiming at deprecating the DataSet API in favour of the DataStream API and the Table API. Users should be able to write a program using the DataStream API that will execute efficiently on both bounded and unbounded input data. To understand what we mean by this we have to elaborate a bit.

...

We also want to make the shuffle mode configurable because there might be cases where a user wants BATCH execution mode but still want's wants the data to be exchanged in a pipelined fashion. For this we propose a new configuration option execution.shuffle-mode. We will not expose a dedicated setter in StreamExecutionEnvironment, or ExecutionConfig

...

In essence, the “real-time” nature of streaming computations lead to a correlation between the responsiveness of the system, i.e. how much do we have to wait until we get results for our 1h windowing computation, and the wall-clock/processing time. This correlation does not exist in the batch world where the input dataset is static and known in advance.

Given the above discussion and the fact that with batch processing completeness, correctness and reproducibility of the result have no fundamental reason to be sacrificed for performance, we argue that processing time can be ignored in batch.

Practically, this means that in BATCH execution mode:

  1. When trying to apply processing time windowing on batch workloads, Flink could throw an exception warning the user.
  2. When the user tries to set Processing Time timers, e.g. a ProcessFunction, or specify a processing time-based policy, e.g. a rolling policy in the StreamingFileSink, Flink will ignore these timers when executed in batch.
  3. Custom triggers, both in event and processing time, should be treated as optional and potentially be ignored in batch. The reason is that early triggers are simply a means to bypass the previously mentioned correlation between responsiveness and processing time, which does not exist in batch processing.

We propose to introduce three new configuration options:

We propose to introduce two new configuration options:

  • pipeline.processing-pipeline.processing-time.allow:
    • ALLOW: allow API calls to get the current processing time and register timers
    • IGNORE: silently ignore calls to register processing-time timers TODO: What to return from getCurrentProcessingTime()
    • FAIL: fail with an exception when API methods that deal with processing time are called
  • pipeline.processing-time.end-of-input:
    • FIRE_AND_QUIESCE: fire any pending processing-time timers at end-of-input but silently ignore attempts to register new timers
    • IGNORE: ignore pending processing-time timers at end-of-input and shut down posthaste
    pipeline.windowing.triggers.allow:
    • ALLOW: allow and use window Triggers, this is the default DataStream API behaviour so far
    • IGNORE: completely ignore any Triggers that are set via the windowing API

These options refer to BOTH batch and streaming and they will make sure that the same job written for streaming can also run for batch. The defaults for STREAMING and BATCH, respectively, should be:

  • STREAMING
    • pipeline.processing-time.allow: ALLOW
    • pipeline.processing-time.end-of-input: FIRE_AND_QUIESCEpipeline.windowing.triggers.allow: ALLOW: FIRE_AND_QUIESCE
  • BATCH
    • pipeline.processing-time.allow: FAIL
    • pipeline.processing-time.end-of-input: IGNOREpipeline.windowing.triggers.allow: IGNORE

We think that we should fail hard in BATCH execution mode when using processing-time API because silently ignoring those API calls could lead to surprising job results. If we accept this, we need to silently ignore custom Triggers in the Windowing API because using them would lead to failures from them trying to use the processing-time APIswhen using processing-time API because silently ignoring those API calls could lead to surprising job results.

The proposed default of pipeline.processing-time.end-of-input: FIRE_AND_QUIESCE in STREAMING mode is a change from the current behaviour but there have been enough users reporting that they were surprised by this behaviour on the mailing list to warrant this change. See also 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-18647
.

...

  • STREAMING:
    • execution.schedule-mode = EAGER
    • execution.shuffle-mode = ALL_EDGES_PIPELINED
    • execution.incremental-updates = true
  • BATCH:
    • execution.schedule-mode = LAZY_FROM_SOURCSE
    • execution.shuffle-mode = POINTWISE_EDGES_PIPELINED
    • execution.incremental-updates = false

We propose to introduce three two new settings pipeline.processing-time.allow , and pipeline.processing-time.end-of-input, pipeline.windowing.triggers.allow with  with these defaults:

  • STREAMING
    • pipeline.processing-time.allow: ALLOW
    • pipeline.processing-time.end-of-input: FIRE_AND_QUIESCE

...

  • BATCH
    • pipeline.processing-time.allow: FAIL
    • pipeline.processing-time.end-of-input

...

    • : IGNORE

We want to make EventTime the new default StreamTimeCharacteristic.

...

Test Plan

The new code should, of course, be tested by unti unit tests and integration tests as well as end-to-end tests. The most important acceptance test is whether we can execute a bounded DataStream program with multiple stages (keyBy()) on a single TaskManager with a single task slot. This is semething something that is not possible in STREAMING mode because all tasks/operations need to be online at the same time but is something that will be possible in BATCH execution mode.

...