Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties



...

DataStream API exposes many internal concepts, implementation details to users. This breaks our principle of not revealing implementation details, easy to understand and use, keep it compatible. Some of which were not intended, some were intended at beginning but should be changed now.

  • Some non-API types (with @Internalannotation, or without any annotation) are used for arguments / return-values of APIs. E.g., the @InternaltypeStreamRecordis used for the @PublicEvolvingAPI `StreamOperator#setKeyContextElement1/setKeyContextElement2`.

...

  • User programs and runtime codes are depending on the same concrete classes, which means methods that are intended to only be used for runtime codes can also be accessed by user programs, despite annotated as @Internalor not. E.g., `DataStream#getTransformation`.

...

  • Many APIs are designed to let users extends an abstract / concrete class. This is problematic because user classes can access the internals of the super class and override its behaviors, leading to unexpectable problems. E.g., AbstractStreamOperator.

...

  • StreamOperator, which is currently a @PublicEvolvingAPI, is closely coupled with many runtime internal concepts: checkpoints / snapshots, key context, containing task, etc. These concepts are unnecessarily exposed to users.

As a consequence, many internal changes may unnecessarily break backwards compatibility, which makes maintenance and evolving of Flink runtime hard and complex.

Concepts and Primitives Lack Organization

Currently,

...

DataStream API provides primitives that corresponds to concepts of many different levels. This makes it difficult to maintain consistency across different interfaces and breaks our principle of make it as flexible as possible and easy to understand and use. The main reasons are as follows:

  • It contains the most fundamental concepts that defines a stateful stream processing application: data streams, transforms, states, etc.

  • It also allows defining arbitrary transforms via process functions and custom operators.

  • It provides high-level implementations and utilities for common data processing patterns: map, aggregate, join, window, etc.

  • There are also short-cuts for specific source / sink implementations, mainly for testing purpose.

...

We have been saying that DataStream API is stream-batch unified. But that doesn't mean everything in DataStream API is stream-batch unified. This breaks our principle of clear and unambiguous definition and easy to understand and useFor example, processing time only makes sense in real-time stream processing, while caching of intermediate results is only supported for batch processing. In addition, DataStream API does not support some batch specific operations (such as sort) because it is hard to define the behavior in unbounded stream scenarios. It's hard for people to understand which APIs are batch-stream unified and which are batch / stream dedicated.

Proposed Plan

...

To address the above issues, we want to introduce another set of API,DataStream API V2, and help users to migrate from the current DataStream API (hereinafter referred to as "V1")

...

smoothly. That is to say, once the DataStream API V2 is ready, the V1 will not be immediately removed, but will co-exist with V2 for a reasonable migration period.

Design Goals

Through this new API, we aim to achieve the following goals:

  • Re-organize the concepts: 

    • Separate fundamental and high-level concepts: Define the most primitive parts required for the API clearly, and separate different sets of high-level supports like batch-stream unified functions, window... etc.

    • Improve the definitions and primitives.

  • User only depends on pure APIs: Providing a thin and pure APIs layer. Both the user's job and flink-runtime will depends on this API layer. This eliminates the need for user jobs to (indirectly) depend on runtime internals.

  • Eliminate internal-exposing apis / contents.

Scope and Principles

...

Introducing a new API is a huge topic, so let's first clarify the scope and principles of this proposed API.

  • Only focusing on APIs: The Internal implementations can reused previous work(e.g. transformations and operations)   as as much as possible as a first step, and refactored later in a compatible and user-not-aware way . This can make the discussion more focused on API semantics. Besides,   some some refactoring work may be easier if done after removing DataStream V1 .
  • Incremental additions:   Datastream Datastream API V1 supports a wide range of features. Our plan is to incrementally add supports for these features in V2. Therefore, you may find many features are missing in the early sub-FLIPs. Eventually, it is required for V2 to have at least equivalent functionalities as V1 before deprecating and removing V1, except for functionalities that are determined unwanted. 
  • APIs are initially experimental: By convention, @Experimental APIs should be promoted to @PublicEvolving in 2 minor releases. But as we are developing incrementally, the whole set of API would take multiple release cycles to complete.  Therefore, we proposed not to start counting the promoting period for the early-merged APIs, until the whole API set is completed. APIs are initially experimental: By convention, @Experimental APIs should be promoted to @PublicEvolving in 2 minor releases. But as we are developing incrementally, the whole set of API would take multiple release cycles to complete. Therefore, we proposed not to start counting the promoting period for the early-merged APIs, until the whole API set is completed.

Proposed Changes

Based on whether its functionality must be provided by the framework, we divide the relevant concepts into two categories: fundamental primitives and high-Level extensions.

...

The reason why we consider it as the fundamental primitives is that users themselves cannot perceive watermarks from different parallelisms, so they cannot align them. Similar requirements must be provided by the framework.

Processing Timer Service

Currently,

...

Flink uses a unified abstraction for both Processing Time and Event Time. This is no longer the case in the new API. The main reasons are as follows:

  • Processing time only makes sense for real-time stream processing, while event time is meaningful for both real-time stream processing and historical batch processing.   Even Even if a unified abstraction is adopted, users still need to understand and carefully deal with the differences between them. Otherwise, It is easy to produce unexpected behavior.
  • The advantage of unified abstraction is that users can easily switch between the two different time semantics without significantly changing the time-based processing logics. However, in reality, we are barely aware of any cases where the same processing logics are reused for both time sementicssemantics. 

Therefore,

...

in the new API, we only provide timer service for processing time. Event timer can be achieved by leveraging the watermark handling interfaces. The reason processing timer service is considered part of the fundamental primitives is that, it is required to work based on task's mailbox thread model, in order not to introduce concurrency issues.

High-Level Extensions

High-Level extensions are like short-cuts / sugars, without which users can probably still achieve the same behavior by working with the fundamental APIs, but would be a lot easier with the builtin supports. It includes Built-In Functions, Event Timer Service and Window.

...

  • A special type of generalized watermark with temporal semantics and an alignment algorithm for this watermark whose behavior remains consistent with before.

  • Some operators based on event time.

  • Some components to extract and handle event time: timestamp assigner, watermark generator...

Window

...

Window is a special processing mechanism for data on the stream: It split the stream into “buckets”, over which we can apply computations. Theoretically, window can be implemented on a process function by defining a series of states. Therefore, we consider it as a high-level extension.


Move common dependencies into separate module

...

To this end, we propose extracting the interfaces that the API needs to depend on into a new module, perhaps called  flink-core-api, and keeping the package path unchanged. And then making the core module depends on this core-api module. This will not break the compatibility of old API, but also allow the new API to depend solely on abstraction.

The dependency relationship between the API module and flink-core before and after this proposal is shown in the figure:

Umbrella FLIP is only intended to illustrate the proposed solution, so we do not want to list all the classes involved here. As for sub-FLIPs, the classes/interfaces that need to be moved to flink-core-api will be listed in detail.

Related sub-FLIPs

Since developing a new API is a relatively complex work, it is difficult to explain all the details in one FLIP. Therefore, we plan to split it into multiple sub-FLIPs for separate discussions.

...

State Access on DataStream API V2

...

The new API's support for state will also discussed in a separate FLIP, it will focus on how to define and access state in process function. It is also possible to discuss the further support of DataStream API V2 for the storage and computing disaggregated architecture of state.

Introduce Generalized Watermark

...

There are various built-in functions, which can be generally divided into two categories: stateful and stateless. We will discuss their implementations separately in two FLIPs.

In addition, due to the the concept and implementation of join is relatively complex, even though it is indeed a built-in function, we still want to discuss it separately. And because it is nearly based on window, we will put them both in a single FLIP. 

Introduce Execution Hint for Process Function

It'

...

s hard for people to understand which functions are batch-stream unified and which are batch / stream dedicated. We want to introduce a hint to identify the type of function in DataStream API V2. This not only helps users, but also helps the engine work better. For example, for batch only functions, more aggressively optimize can be done in runtime level.
We will discuss this section in a separate FLIP, mainly including:

  • Introducing execution hint mechanism for DataStream API V2.
  • How to mark this hint for built-in functions as well as user defined functions.
  • What other helpful hints can be provided in the future

...

We will discuss this section in a separate FLIP, mainly including:

  • Introducing execution hint mechanism for DataStream API V2.

  • How to mark this hint for buit-in functions as well as user defined functions.

  • What other helpful hints can be provided in the future.

DataStream API V2's Support for Event Time

...

Event time is an important component of timely stream processing. It is worth discussing with a separate FLIP, especially since the new API no longer consider it as the fundamental semantics. This FLIP will focus on the following aspects:

  • How to Implement event time watermark via generalized watermark mechanism.

  • How to extract and process event time in DataStream API V2.

...

The removal of DataStream V1 needs to meet the following conditions at the same time:

‒ It was marked deprecated for at least two minor releases.

‒ Most users have already or can migrate to DataStream API V2.

...

  1. It was marked deprecated for at least two minor releases.
  2. Most users have already or can migrate to DataStream API V2.
  3. It can only be removed in major release.

This is an either in-place or smooth replacement of DataStream API. It will coexist with the old API for a considerable period of time. Once the old API is removed, all SDK-based jobs need to be migrated.

...