Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

Vote threadhttps://

...

...

thread.html/r6ffdbe90e98542f61e610e5f5abdba0f7a5acb5f98cd01baa1fc17d0%40%3Cdev.flink.apache.org%3E
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-19316

Release1.12


Table of Contents
excludeStatus

Motivation

As described in FLIP-

Table of Contents
excludeStatus

Motivation

As described in FLIP-131, we are aiming at deprecating the DataSet API in favour of the DataStream API and the Table API. Users should be able to write a program using the DataStream API that will execute efficiently on both bounded and unbounded input data. To understand what we mean by this we have to elaborate a bit.

...

We also want to make the shuffle mode configurable because there might be cases where a user wants BATCH execution mode but still want's wants the data to be exchanged in a pipelined fashion. For this we propose a new configuration option execution.shuffle-mode. We will not expose a dedicated setter in StreamExecutionEnvironment, or ExecutionConfig

...

In essence, the “real-time” nature of streaming computations lead to a correlation between the responsiveness of the system, i.e. how much do we have to wait until we get results for our 1h windowing computation, and the wall-clock/processing time. This correlation does not exist in the batch world where the input dataset is static and known in advance.

Given the above discussion and the fact that with batch processing completeness, correctness and reproducibility of the result have no fundamental reason to be sacrificed for performance, we argue that processing time can be ignored in batch.

Practically, this means that in BATCH execution mode:

  1. When trying to apply processing time windowing on batch workloads, Flink could throw an exception warning the user.
  2. When the user tries to set Processing Time timers, e.g. a ProcessFunction, or specify a processing time-based policy, e.g. a rolling policy in the StreamingFileSink, Flink will ignore these timers when executed in batch.
  3. Custom triggers, both in event and processing time, should be treated as optional and potentially be ignored in batch. The reason is that early triggers are simply a means to bypass the previously mentioned correlation between responsiveness and processing time, which does not exist in batch processing.

...

We propose to introduce two new configuration options:

  • pipeline.processing-time.allow:
    • ALLOW: allow API calls to get the current processing time and register timers
    • IGNORE: silently ignore calls to register processing-time timers TODO: What to return from getCurrentProcessingTime()
    • FAIL: fail with an exception when API methods that deal with processing time are called
  • pipeline.processing-time.end-of-input:
    • FIRE_AND_QUIESCE: fire any pending processing-time timers at end-of-input but silently ignore attempts to register new timers
    • IGNORE: ignore pending processing-time timers at end-of-input and shut down posthaste

These options refer to BOTH batch and streaming and they will make sure that the same job written for streaming can also run for batch. The defaults for STREAMING and BATCH, respectively, should be:

  • STREAMING
    • pipeline.processing-time.allow: ALLOW
    • pipeline.processing-time.end-of-input: FIRE_AND_QUIESCE
  • BATCH
    • pipeline.processing-time.allow: FAIL
    • ALLOW: allow API calls to get the current processing time and register timers
    • IGNORE: silently ignore calls to register processing-time timers TODO: What to return from getCurrentProcessingTime()
    • FAIL: fail with an exception when API methods that deal with processing time are called
    • pipeline.processing-time.end-of-input:
    • FIRE_AND_QUIESCE: fire any pending processing-time timers at end-of-input but silently ignore attempts to register new timers
    • IGNORE: ignore pending processing-time timers at
    • IGNORE

We think that we should fail hard in BATCH execution mode when using processing-time API because silently ignoring those API calls could lead to surprising job results.

The proposed default of pipeline.processing-time.end-of-input

...

  • ALLOW: allow and use window Triggers, this is the default DataStream API behaviour so far
  • IGNORE: completely ignore any Triggers that are set via the windowing API

: FIRE_AND_QUIESCE in STREAMING mode is a change from the current behaviour but there have been enough users reporting that they were surprised by this behaviour on the mailing list to warrant this change. See also 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-18647
.

Event Time Support in BATCH execution mode

Flink’s streaming runtime builds on the pessimistic assumption that there are no guarantees about the order of the events. This means that events may come out-of-order, i.e. an event with timestamp t may come after an event with timestamp t+1. In addition, the system can never be sure that no more elements with timestamp t < T can come in the future. To amortise the impact of this out-of-orderness, Flink, along with other frameworks in the streaming space, uses a heuristic called Watermarks. A watermark with timestamp T signals that no element with timestamp t < T will follow.

In the batch world, where the input dataset is assumed to be static and known in advance, there is no need for such a heuristic as, at the very least, elements can be sorted by timestamp so that they are processed in temporal order. For readers familiar with streaming, in BATCH we can assume “perfect watermarks”.

Implications: Given the above, in BATCH mode, we only need to send a MAX_WATERMARK when the end of input is reached or at the end of each key if we decide to process each key independently, as done in DataSet. This will allow all registered timers to fire. This means that user-defined WatermarkAssigners will be ignored.

Although the above does not directly imply any user-visible change, it has to be stressed out as in some cases, the same application executed on batch and streaming may lead to different results due to late elements being left out in case of streaming.

Making EventTime the new default StreamTimeCharacteristic

As described above, event time is the only sensible time characteristic for batch. We therefore propose to chagne the default value of the StreamTimeCharacteristic from ProcessingTime to EventTime. This means the DataStream API programs that were using event time before now just work without manually changing this setting. Processing-time programs will also still work, because using processing-time timers is not dependent on the StreamTimeCharacteristic. DataStream programs that don't set a TimestampAssigner or WatermarkStrategy will also still work if they don't use operations that don't rely on (event-time) timestamps. This is true for both BATCH and STREAMING execution mode.

The only real user-visible change of this is that programs that used the KeyedStream.timeWindow()/DataStream.timeWindow() operation, which is dependent on the StreamTimeCharacteristic will now use event time by default. We don't think this operation is useful because the behaviour can be surprising. We recommend users always use an explicit processing-time window or event-time window.

Deprecating timeWindow() and related

These options refer to BOTH batch and streaming and they will make sure that the same job written for streaming can also run for batch. The defaults for STREAMING and BATCH, respectively, should be:

  • STREAMING
    • pipeline.processing-time.allow: ALLOW
    • pipeline.processing-time.end-of-input: FIRE_AND_QUIESCE
    • pipeline.windowing.triggers.allow: ALLOW
  • BATCH
    • pipeline.processing-time.allow: FAIL
    • pipeline.processing-time.end-of-input: IGNORE
    • pipeline.windowing.triggers.allow: IGNORE

We think that we should fail hard in BATCH execution mode when using processing-time API because silently ignoring those API calls could lead to surprising job results. If we accept this, we need to silently ignore custom Triggers in the Windowing API because using them would lead to failures from them trying to use the processing-time APIs.

The proposed default of pipeline.processing-time.end-of-input: FIRE_AND_QUIESCE in STREAMING mode is a change from the current behaviour but there have been enough users reporting that they were surprised by this behaviour on the mailing list to warrant this change. See also 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-18647
.

Event Time Support in BATCH execution mode

Flink’s streaming runtime builds on the pessimistic assumption that there are no guarantees about the order of the events. This means that events may come out-of-order, i.e. an event with timestamp t may come after an event with timestamp t+1. In addition, the system can never be sure that no more elements with timestamp t < T can come in the future. To amortise the impact of this out-of-orderness, Flink, along with other frameworks in the streaming space, uses a heuristic called Watermarks. A watermark with timestamp T signals that no element with timestamp t < T will follow.

In the batch world, where the input dataset is assumed to be static and known in advance, there is no need for such a heuristic as, at the very least, elements can be sorted by timestamp so that they are processed in temporal order. For readers familiar with streaming, in BATCH we can assume “perfect watermarks”.

Implications: Given the above, in BATCH mode, we only need to send a MAX_WATERMARK when the end of input is reached or at the end of each key if we decide to process each key independently, as done in DataSet. This will allow all registered timers to fire. This means that user-defined WatermarkAssigners will be ignored.

Although the above does not directly imply any user-visible change, it has to be stressed out as in some cases, the same application executed on batch and streaming may lead to different results due to late elements being left out in case of streaming.

Making EventTime the new default StreamTimeCharacteristic

As described above, event time is the only sensible time characteristic for batch. We therefore propose to chagne the default value of the StreamTimeCharacteristic from ProcessingTime to EventTime. This means the DataStream API programs that were using event time before now just work without manually changing this setting. Processing-time programs will also still work, because using processing-time timers is not dependent on the StreamTimeCharacteristic. DataStream programs that don't set a TimestampAssigner or WatermarkStrategy will also still work if they don't use operations that don't rely on (event-time) timestamps. This is true for both BATCH and STREAMING execution mode.

The only real user-visible change of this is that programs that used the KeyedStream.timeWindow()/DataStream.timeWindow() operation, which is dependent on the StreamTimeCharacteristic will now use event time by default. We don't think this operation is useful because the behaviour can be surprising. We recommend users always use an explicit processing-time window or event-time window.

Deprecating timeWindow() and related methods

As describe above, we think timeWindow() is not a useful operation and therefore propose to deprecate and eventually remove it. The operation can have surprising behaviour and users should use explicit process-time or event-time operations.

...

As discussed in FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API) we see the Table API/SQL as the relational API, where we expect users to work with schemas and fields. Going forward, we envision the DataStream API to be "slightly" lower level API, with more explicit control over the execution graph, operations, and state. Having said that we think it is worth deprecating and removing in the future all relational style methods in DataStream, which often use Reflection to access the fields and thus are less performant than providing explicit extractors such as:

  • DataStream#project
  • Windowed/KeyedStream#sum,min,max,minBy,maxBy
  • DataStream#keyBy where the key specified with field name or index (including ConnectedStreams#keyBy)

Sinks

Current exactly-once sinks in DataStream rely heavily on Flink’s checkpointing mechanism and will not work well in BATCH execution mode. Support for exactly-once sinks is outside the scope of this FLIP and there will be a separate one coming soon.

Iterations

Iterations are outside the scope of this FLIP and there will be a separate one in the future.

Summary of the Proposed Changes

We want to introduce a new setting execution.runtime-modethat controls runtime execution behaviour. The different execution modes semantically set a couple of real or virtual configuration options:

worth deprecating and removing in the future all relational style methods in DataStream, which often use Reflection to access the fields and thus are less performant than providing explicit extractors such as:

  • DataStream#project
  • Windowed/KeyedStream#sum,min,max,minBy,maxBy
  • DataStream#keyBy where the key specified with field name or index (including ConnectedStreams#keyBy)

Sinks

Current exactly-once sinks in DataStream rely heavily on Flink’s checkpointing mechanism and will not work well in BATCH execution mode. Support for exactly-once sinks is outside the scope of this FLIP and there will be a separate one coming soon.

Iterations

Iterations are outside the scope of this FLIP and there will be a separate one in the future.

Summary of the Proposed Changes

We want to introduce a new setting execution.runtime-modethat controls runtime execution behaviour. The different execution modes semantically set a couple of real or virtual configuration options:

  • STREAMING:
    • execution.schedule-mode = EAGER
    • execution.shuffle-mode = ALL_EDGES_PIPELINED
    • execution.incremental-updates = true
  • BATCH:
    • execution.schedule-mode = LAZY_FROM_SOURCSE
    • execution.shuffle-mode = POINTWISE_EDGES_PIPELINED
    • execution.incremental-updates = false

We propose to introduce two new settings pipeline.processing-time.allow and pipeline.processing-time.end-of-input with these defaults:

  • STREAMING
    • pipeline.processing-time.allow: ALLOW
    • pipeline.processing-time.end-of-input: FIRE_AND_QUIESCE
  • BATCH
    • pipeline.processing-time.allow: FAIL
    • pipeline.processing-time.end-of-input: IGNORE
  • STREAMING:
    • execution.schedule-mode = EAGER
    • execution.shuffle-mode = ALL_EDGES_PIPELINED
    • execution.incremental-updates = true
  • BATCH:
    • execution.schedule-mode = LAZY_FROM_SOURCSE
    • execution.shuffle-mode = POINTWISE_EDGES_PIPELINED
    • execution.incremental-updates = false

We want to make EventTime the new default StreamTimeCharacteristic.

...

Test Plan

The new code should, of course, be tested by unti unit tests and integration tests as well as end-to-end tests. The most important acceptance test is whether we can execute a bounded DataStream program with multiple stages (keyBy()) on a single TaskManager with a single task slot. This is semething something that is not possible in STREAMING mode because all tasks/operations need to be online at the same time but is something that will be possible in BATCH execution mode.

...