Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

Vote threadhttps://

...

...

thread.html/r6ffdbe90e98542f61e610e5f5abdba0f7a5acb5f98cd01baa1fc17d0%40%3Cdev.flink.apache.org%3E
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-19316

Release1.12


Table of Contents
excludeStatus

Motivation

As described in FLIP-

Table of Contents
excludeStatus

Motivation

As described in FLIP-131, we are aiming at deprecating the DataSet API in favour of the DataStream API and the Table API. Users should be able to write a program using the DataStream API that will execute efficiently on both bounded and unbounded input data. To understand what we mean by this we have to elaborate a bit.

...

We also want to make the shuffle mode configurable because there might be cases where a user wants BATCH execution mode but still want's wants the data to be exchanged in a pipelined fashion. For this we propose a new configuration option execution.shuffle-mode. We will not expose a dedicated setter in StreamExecutionEnvironment, or ExecutionConfig

...

In essence, the “real-time” nature of streaming computations lead to a correlation between the responsiveness of the system, i.e. how much do we have to wait until we get results for our 1h windowing computation, and the wall-clock/processing time. This correlation does not exist in the batch world where the input dataset is static and known in advance.TODO: Do we want to do it as proposed below or fire all

We propose to introduce two new configuration options:

  • pipeline.processing-time

...

  • .allow:

      Proposal: Given the above discussion, and the fact that with batch processing completeness, correctness and reproducibility of the result have no fundamental reason to be sacrificed for performance, we argue that processing time can be ignored in batch.

      Practically, this means that in BATCH execution mode:

      1. When trying to apply processing time windowing on batch workloads, Flink could throw an exception warning the user.
      2. When the user tries to set Processing Time timers, e.g. a ProcessFunction, or specify a processing time-based policy, e.g. a rolling policy in the StreamingFileSink, Flink will ignore these timers when executed in batch.
      3. Custom triggers, both in event and processing time, should be treated as optional and potentially be ignored in batch. The reason is that early triggers are simply a means to bypass the previously mentioned correlation between responsiveness and processing time, which does not exist in batch processing.

      Future Work: In the future, we may consider adding as options the capability of:

      • firing all the registered processing time timers at the end of a job (at close()).

      These options refer to BOTH batch and streaming and they will make sure that the same job written for streaming can also run for batch.

      Event Time Support in BATCH execution mode

      Flink’s streaming runtime builds on the pessimistic assumption that there are no guarantees about the order of the events. This means that events may come out-of-order, i.e. an event with timestamp t may come after an event with timestamp t+1. In addition, the system can never be sure that no more elements with timestamp t < T can come in the future. To amortise the impact of this out-of-orderness, Flink, along with other frameworks in the streaming space, uses a heuristic called Watermarks. A watermark with timestamp T signals that no element with timestamp t < T will follow.

      In the batch world, where the input dataset is assumed to be static and known in advance, there is no need for such a heuristic as, at the very least, elements can be sorted by timestamp so that they are processed in temporal order. For readers familiar with streaming, in BATCH we can assume “perfect watermarks”.

        • ALLOW: allow API calls to get the current processing time and register timers
        • IGNORE: silently ignore calls to register processing-time timers TODO: What to return from getCurrentProcessingTime()
        • FAIL: fail with an exception when API methods that deal with processing time are called
      • pipeline.processing-time.end-of-input:
        • FIRE_AND_QUIESCE: fire any pending processing-time timers at end-of-input but silently ignore attempts to register new timers
        • IGNORE: ignore pending processing-time timers at end-of-input and shut down posthaste

      These options refer to BOTH batch and streaming and they will make sure that the same job written for streaming can also run for batch. The defaults for STREAMING and BATCH, respectively, should be:

      • STREAMING
        • pipeline.processing-time.allow: ALLOW
        • pipeline.processing-time.end-of-input: FIRE_AND_QUIESCE
      • BATCH
        • pipeline.processing-time.allow: FAIL
        • pipeline.processing-time.end-of-input: IGNORE

      We think that we should fail hard in BATCH execution mode when using processing-time API because silently ignoring those API calls could lead to surprising job results.

      The proposed default of pipeline.processing-time.end-of-input: FIRE_AND_QUIESCE in STREAMING mode is a change from the current behaviour but there have been enough users reporting that they were surprised by this behaviour on the mailing list to warrant this change. See also 

      Jira
      serverASF JIRA
      serverId5aa69414-a9e9-3523-82ec-879b028fb15b
      keyFLINK-18647
      .

      Event Time Support in BATCH execution mode

      Flink’s streaming runtime builds on the pessimistic assumption that there are no guarantees about the order of the events. This means that events may come out-of-order, i.e. an event with timestamp t may come after an event with timestamp t+1. In addition, the system can never be sure that no more elements with timestamp t < T can come in the future. To amortise the impact of this out-of-orderness, Flink, along with other frameworks in the streaming space, uses a heuristic called Watermarks. A watermark with timestamp T signals that no element with timestamp t < T will follow.

      In the batch world, where the input dataset is assumed to be static and known in advance, there is no need for such a heuristic as, at the very least, elements can be sorted by timestamp so that they are processed in temporal order. For readers familiar with streaming, in BATCH we can assume “perfect watermarks”.

      Implications: Given the above, in BATCH mode, we only Implications: Given the above, in BATCH mode, we only need to send a MAX_WATERMARK when the end of input is reached or at the end of each key if we decide to process each key independently, as done in DataSet. This will allow all registered timers to fire. This means that user-defined WatermarkAssigners will be ignored.

      ...

      As describe above, we think timeWindow() is not a useful operation and therefore propose to deprecate and eventually remove it. The operation can have surprising behaviour and users should use explicit process-time or event-time operations.

      Broadcast State

      TODO: Do we need to cover this here?

      A powerful feature of the DataStream API is the Broadcast State pattern. This feature/pattern was introduced to allow users to implement use-cases where a “control” stream needs to be broadcasted to all downstream tasks, and its broadcasted elements, e.g. rules, need to be applied to all incoming elements from another stream. An example can be a fraud detection use-case where the rules evolve over time.

      In this pattern, Flink provides no guarantees about the order in which the inputs are read. Essentially, there is no way to force the broadcast side to be read before the non-broadcast side. Use-cases like the one above make sense in the streaming world where jobs are expected to run for a long period of time with input data that are not known in advance. In these settings, requirements may change over time depending on the incoming data. 

      In the batch world though, we believe that such use-cases do not make much sense, as the input (both the elements and the control stream) are expected to be static and known in advance. 

      Proposal (potentially not in 1.12): Build custom support for broadcast state pattern where the broadcast side is read first.

      Incremental updates vs. "final" updates in BATCH vs. STREAM execution mode

      Some of the operations on DataStream have semantics that might make sense for stream processing but should behave differently in BATCH execution mode. For example, KeyedStream.reduce() is essentially a reduce on a GlobalWindow with a Trigger that fires on every element. In data base terms it produces an UPSERT stream as an output: if you get ten input elements for a key you also get ten output records. For batch processing, it makes more sense to instead only produce one output record per key with the result of the aggregation when we reach the end of stream/key. This will be correct for downstream consumers that expect an UPSERT stream but it will change the actual physical output stream that they see. We therefore suggest to change the behaviour of these methods to only emit a final result at the end of input:

      • KeyedStream#reduce
      • KeyedStream#sum,min,max,minBy,maxBy
      • KeyedStream#fold

      Semantically, you can think of the BATCH mode as enabling a virtual execution.incremental-updates = false setting.

      Deprecating Relational methods on DataStream

      As discussed in FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API) we see the Table API/SQL as the relational API, where we expect users to work with schemas and fields. Going forward, we envision the DataStream API to be "slightly" lower level API, with more explicit control over the execution graph, operations, and state. Having said that we think it is worth deprecating and removing in the future all relational style methods in DataStream, which often use Reflection to access the fields and thus are less performant than providing explicit extractors such as:

      • DataStream#project
      • Windowed/KeyedStream#sum,min,max,minBy,maxBy
      • DataStream#keyBy where the key specified with field name or index (including ConnectedStreams#keyBy)

      Sinks

      Current exactly-once sinks in DataStream rely heavily on Flink’s checkpointing mechanism and will not work well in BATCH execution mode. Support for exactly-once sinks is outside the scope of this FLIP and there will be a separate one coming soon.

      Iterations

      Iterations are outside the scope of this FLIP and there will be a separate one in the future.

      Summary of the Proposed Changes

      We want to introduce a new setting execution.runtime-modethat controls runtime execution behaviour. The different execution modes semantically set a couple of real or virtual configuration options:

      Incremental updates vs. "final" updates in BATCH vs. STREAM execution mode

      Some of the operations on DataStream have semantics that might make sense for stream processing but should behave differently in BATCH execution mode. For example, KeyedStream.reduce() is essentially a reduce on a GlobalWindow with a Trigger that fires on every element. In data base terms it produces an UPSERT stream as an output: if you get ten input elements for a key you also get ten output records. For batch processing, it makes more sense to instead only produce one output record per key with the result of the aggregation when we reach the end of stream/key. This will be correct for downstream consumers that expect an UPSERT stream but it will change the actual physical output stream that they see. We therefore suggest to change the behaviour of these methods to only emit a final result at the end of input:

      • KeyedStream#reduce
      • KeyedStream#sum,min,max,minBy,maxBy
      • KeyedStream#fold

      Semantically, you can think of the BATCH mode as enabling a virtual execution.incremental-updates = false setting.

      Deprecating Relational methods on DataStream

      As discussed in FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API) we see the Table API/SQL as the relational API, where we expect users to work with schemas and fields. Going forward, we envision the DataStream API to be "slightly" lower level API, with more explicit control over the execution graph, operations, and state. Having said that we think it is worth deprecating and removing in the future all relational style methods in DataStream, which often use Reflection to access the fields and thus are less performant than providing explicit extractors such as:

      • DataStream#project
      • Windowed/KeyedStream#sum,min,max,minBy,maxBy
      • DataStream#keyBy where the key specified with field name or index (including ConnectedStreams#keyBy)

      Sinks

      Current exactly-once sinks in DataStream rely heavily on Flink’s checkpointing mechanism and will not work well in BATCH execution mode. Support for exactly-once sinks is outside the scope of this FLIP and there will be a separate one coming soon.

      Iterations

      Iterations are outside the scope of this FLIP and there will be a separate one in the future.

      Summary of the Proposed Changes

      We want to introduce a new setting execution.runtime-modethat controls runtime execution behaviour. The different execution modes semantically set a couple of real or virtual configuration options:

      • STREAMING:
        • execution.schedule-mode = EAGER
        • execution.shuffle-mode = ALL_EDGES_PIPELINED
        • execution.incremental-updates = true
      • BATCH:
        • execution.schedule-mode = LAZY_FROM_SOURCSE
        • execution.shuffle-mode = POINTWISE_EDGES_PIPELINED
        • execution.incremental-updates = false

      We propose to introduce two new settings pipeline.processing-time.allow and pipeline.processing-time.end-of-input with these defaults:

      • STREAMING
        • pipeline.processing-time.allow: ALLOW
        • pipeline.processing-time.end-of-input: FIRE_AND_QUIESCE
      • BATCH
        • pipeline.processing-time.allow: FAIL
        • pipeline.processing-time.end-of-input: IGNORE
      • STREAMING:
        • execution.schedule-mode = EAGER
        • execution.shuffle-mode = ALL_EDGES_PIPELINED
        • execution.incremental-updates = true
      • BATCH:
        • execution.schedule-mode = LAZY_FROM_SOURCSE
        • execution.shuffle-mode = POINTWISE_EDGES_PIPELINED
        • execution.incremental-updates = false

      We want to make EventTime the new default StreamTimeCharacteristic.

      ...

      Test Plan

      The new code should, of course, be tested by unti unit tests and integration tests as well as end-to-end tests. The most important acceptance test is whether we can execute a bounded DataStream program with multiple stages (keyBy()) on a single TaskManager with a single task slot. This is semething something that is not possible in STREAMING mode because all tasks/operations need to be online at the same time but is something that will be possible in BATCH execution mode.

      ...