Status
...
Page properties | |
---|---|
|
...
...
|
...
...
|
Table of Contents | ||
---|---|---|
|
Motivation
As described in FLIP-
Table of Contents | ||
---|---|---|
|
Motivation
As described in FLIP-131, we are aiming at deprecating the DataSet API in favour of the DataStream API and the Table API. Users should be able to write a program using the DataStream API that will execute efficiently on both bounded and unbounded input data. To understand what we mean by this we have to elaborate a bit.
...
We also want to make the shuffle mode configurable because there might be cases where a user wants BATCH execution mode but still want's wants the data to be exchanged in a pipelined fashion. For this we propose a new configuration option execution.shuffle-mode
. We will not expose a dedicated setter in StreamExecutionEnvironment, or ExecutionConfig
...
In essence, the “real-time” nature of streaming computations lead to a correlation between the responsiveness of the system, i.e. how much do we have to wait until we get results for our 1h windowing computation, and the wall-clock/processing time. This correlation does not exist in the batch world where the input dataset is static and known in advance.
Given the above discussion and the fact that with batch processing completeness, correctness and reproducibility of the result have no fundamental reason to be sacrificed for performance, we argue that processing time can be ignored in batch.
Practically, this means that in BATCH execution mode:
- When trying to apply processing time windowing on batch workloads, Flink could throw an exception warning the user.
- When the user tries to set Processing Time timers, e.g. a ProcessFunction, or specify a processing time-based policy, e.g. a rolling policy in the StreamingFileSink, Flink will ignore these timers when executed in batch.
- Custom triggers, both in event and processing time, should be treated as optional and potentially be ignored in batch. The reason is that early triggers are simply a means to bypass the previously mentioned correlation between responsiveness and processing time, which does not exist in batch processing.
We propose to introduce three new configuration options:
We propose to introduce two new configuration options:
- pipeline.processing-pipeline.processing-time.allow:
- ALLOW: allow API calls to get the current processing time and register timers
IGNORE: silently ignore calls to register processing-time timers TODO: What to return from getCurrentProcessingTime()- FAIL: fail with an exception when API methods that deal with processing time are called
- pipeline.processing-time.end-of-input:
- FIRE_AND_QUIESCE: fire any pending processing-time timers at end-of-input but silently ignore attempts to register new timers
- IGNORE: ignore pending processing-time timers at end-of-input and shut down posthaste
- ALLOW: allow and use window Triggers, this is the default DataStream API behaviour so far
- IGNORE: completely ignore any Triggers that are set via the windowing API
These options refer to BOTH batch and streaming and they will make sure that the same job written for streaming can also run for batch. The defaults for STREAMING and BATCH, respectively, should be:
- STREAMING
- pipeline.processing-time.allow: ALLOW
- pipeline.processing-time.end-of-input: FIRE_AND_QUIESCEpipeline.windowing.triggers.allow: ALLOW: FIRE_AND_QUIESCE
- BATCH
- pipeline.processing-time.allow: FAIL
- pipeline.processing-time.end-of-input: IGNOREpipeline.windowing.triggers.allow: IGNORE
We think that we should fail hard in BATCH execution mode when using processing-time API because silently ignoring those API calls could lead to surprising job results. If we accept this, we need to silently ignore custom Triggers in the Windowing API because using them would lead to failures from them trying to use the processing-time APIswhen using processing-time API because silently ignoring those API calls could lead to surprising job results.
The proposed default of pipeline.processing-time.end-of-input: FIRE_AND_QUIESCE in STREAMING mode is a change from the current behaviour but there have been enough users reporting that they were surprised by this behaviour on the mailing list to warrant this change. See also
Jira | ||||||
---|---|---|---|---|---|---|
|
...
- STREAMING:
- execution.schedule-mode = EAGER
- execution.shuffle-mode = ALL_EDGES_PIPELINED
- execution.incremental-updates = true
- BATCH:
- execution.schedule-mode = LAZY_FROM_SOURCSE
- execution.shuffle-mode = POINTWISE_EDGES_PIPELINED
- execution.incremental-updates = false
We propose to introduce three two new settings pipeline.processing-time.allow , and pipeline.processing-time.end-of-input, pipeline.windowing.triggers.allow with with these defaults:
- STREAMING
- pipeline.processing-time.allow: ALLOW
- pipeline.processing-time.end-of-input: FIRE_AND_QUIESCE
...
- BATCH
- pipeline.processing-time.allow: FAIL
- pipeline.processing-time.end-of-input
...
- : IGNORE
We want to make EventTime the new default StreamTimeCharacteristic.
...
Test Plan
The new code should, of course, be tested by unti unit tests and integration tests as well as end-to-end tests. The most important acceptance test is whether we can execute a bounded DataStream program with multiple stages (keyBy()) on a single TaskManager with a single task slot. This is semething something that is not possible in STREAMING mode because all tasks/operations need to be online at the same time but is something that will be possible in BATCH execution mode.
...