Introduction

The DataStream API is one of the two main APIs that Flink provides for writing data processing programs. As an API that was introduced practically since day-1 of the project and has been evolved for nearly a decade, we are observing more and more problems of it. Improvements on these problems require significant breaking changes, which makes in-place refactor impractical. Therefore, we propose to introduce a new set of APIs, the DataStream API V2, to gradually replace the original DataStream API.

The proposal to introduce a whole set new API is complex and includes massive changes. We are planning  to break it down into multiple sub-FLIPs for incremental discussion. This FLIP is only used as an umbrella, mainly focusing on motivation, goals, and overall planning. That is to say, more design and implementation details  will be discussed in other FLIPs.

In order to make incremental progress along this direction, we propose to vote the umbrella and sub-FLIPs separately. But we are also aware of that the decision making for some of these FLIPs may depend on the others. Therefore, we are also open to other opinions and suggestions.

Problems of DataStream API

A good API design can help not only users, but also developers. Through some research, we have come up with the following principles:

  1. Shouldn't reveal implementation details

  2. Easy to understand and use

  3. Clear and unambiguous definition

  4. Make it as flexible as possible

  5. Keep it compatible

The DataStream API V1 already does some of these things well, but we still see some problems that conflict with these principles.

Exposure of Internals

DataStream API exposes many internal concepts, implementation details to users. This breaks our principle of not revealing implementation details, easy to understand and use, keep it compatible. Some of which were not intended, some were intended at beginning but should be changed now.

  • Some non-API types (with @Internalannotation, or without any annotation) are used for arguments / return-values of APIs. E.g., the @InternaltypeStreamRecordis used for the @PublicEvolvingAPI `StreamOperator#setKeyContextElement1/setKeyContextElement2`.
  • User programs and runtime codes are depending on the same concrete classes, which means methods that are intended to only be used for runtime codes can also be accessed by user programs, despite annotated as @Internalor not. E.g., `DataStream#getTransformation`.
  • Many APIs are designed to let users extends an abstract / concrete class. This is problematic because user classes can access the internals of the super class and override its behaviors, leading to unexpectable problems. E.g., AbstractStreamOperator.
  • StreamOperator, which is currently a @PublicEvolvingAPI, is closely coupled with many runtime internal concepts: checkpoints / snapshots, key context, containing task, etc. These concepts are unnecessarily exposed to users.

As a consequence, many internal changes may unnecessarily break backwards compatibility, which makes maintenance and evolving of Flink runtime hard and complex.

Concepts and Primitives Lack Organization

Currently, DataStream API provides primitives that corresponds to concepts of many different levels. This makes it difficult to maintain consistency across different interfaces and breaks our principle of make it as flexible as possible and easy to understand and use. The main reasons are as follows:

  • It contains the most fundamental concepts that defines a stateful stream processing application: data streams, transforms, states, etc.

  • It also allows defining arbitrary transforms via process functions and custom operators.

  • It provides high-level implementations and utilities for common data processing patterns: map, aggregate, join, window, etc.

  • There are also short-cuts for specific source / sink implementations, mainly for testing purpose.

All these primitives are squashed into a single and flat DataStream layer, without clear modularization and organization. This makes it hard for users, especially beginners, to understand and use.

Obscure Stream / Batch Semantics

We have been saying that DataStream API is stream-batch unified. But that doesn't mean everything in DataStream API is stream-batch unified. This breaks our principle of clear and unambiguous definition and easy to understand and useFor example, processing time only makes sense in real-time stream processing, while caching of intermediate results is only supported for batch processing. In addition, DataStream API does not support some batch specific operations (such as sort) because it is hard to define the behavior in unbounded stream scenarios. It's hard for people to understand which APIs are batch-stream unified and which are batch / stream dedicated.

Proposed Plan

To address the above issues, we want to introduce another set of API,DataStream API V2, and help users to migrate from the current DataStream API (hereinafter referred to as "V1") smoothly. That is to say, once the DataStream API V2 is ready, the V1 will not be immediately removed, but will co-exist with V2 for a reasonable migration period.

Design Goals

Through this new API, we aim to achieve the following goals:

  • Re-organize the concepts: 

    • Separate fundamental and high-level concepts: Define the most primitive parts required for the API clearly, and separate different sets of high-level supports like batch-stream unified functions, window... etc.

    • Improve the definitions and primitives.

  • User only depends on pure APIs: Providing a thin and pure APIs layer. Both the user's job and flink-runtime will depends on this API layer. This eliminates the need for user jobs to (indirectly) depend on runtime internals.

  • Eliminate internal-exposing apis / contents.

Scope and Principles

Introducing a new API is a huge topic, so let's first clarify the scope and principles of this proposed API.

  • Only focusing on APIs: The Internal implementations can reused previous work(e.g. transformations and operations) as much as possible as a first step, and refactored later in a compatible and user-not-aware way . This can make the discussion more focused on API semantics. Besides, some refactoring work may be easier if done after removing DataStream V1 .
  • Incremental additions: Datastream API V1 supports a wide range of features. Our plan is to incrementally add supports for these features in V2. Therefore, you may find many features are missing in the early sub-FLIPs. Eventually, it is required for V2 to have at least equivalent functionalities as V1 before deprecating and removing V1, except for functionalities that are determined unwanted.
  • APIs are initially experimental: By convention, @Experimental APIs should be promoted to @PublicEvolving in 2 minor releases. But as we are developing incrementally, the whole set of API would take multiple release cycles to complete. Therefore, we proposed not to start counting the promoting period for the early-merged APIs, until the whole API set is completed.

Proposed Changes

Based on whether its functionality must be provided by the framework, we divide the relevant concepts into two categories: fundamental primitives and high-Level extensions.

Fundamental primitives

Fundamental primitives are the basic and necessary semantics  that the framework need to provide in order to define a stateful stream processing application, which cannot be  achieved by users  if not provided by the framework. It includes DataStream, Partitioning, ProcessFuction, Async Processing, State, Processing Timer Service and (Generalized) Watermark.

DataStream, Partitioning, ProcessFunction and State are the most fundamental elements of new API and respectively represent:

  • What are the types of data streams

  • How data is partitioned 

  • How to perform operations / processing on data streams

  • How to define and access state on data streams

Obviously, they are the fundamental primitives as they must be provided by the framework. As for Async Processing, (Generalized) Watermark and Processing Timer Service, we need to further elaborate.

Async Processing

Flink performs user functions in a synchronous manner, but some operations need to be performed asynchronously. For example, I/O access is a time-consuming process, making the TPS for single operator much lower than in-memory computing, particularly for streaming job, when low latency is a big concern for users. 

Even if users could start other threads to perform these time-consuming operations, they would still need to notify the mailbox thread to send data when the function produces output, or to perform a piece of callback logic. Asynchronous processing is therefore the fundamental primitive provided by the framework.

(Generalized) Watermark

Watermark is driving the progress of event time in Flink. It maintains time consistency via propagation and alignment on the stream and operators. 

This similar characteristic (propagation and alignment) are actually widely present: for example, stream events like EndOfPartition and RecordAttributes proposed by FLIP-309. It is worth mentioning that we have also seen users calling for mechanism related to this in the mail-list, for example, control flow and dynamic CEP. This inspires us to abstract a more general concept that is no longer limited to temporal semantics called Generalized Watermark.

Generalized Watermark represents a special event / signal that can be emitted from source or other operators, received by downstream operators, and aligned across parallel streams during propagation. With such abstraction, the current event-time watermark becomes one specific implementation of the generalized watermarks, for which we can still provide built-in support.

The reason why we consider it as the fundamental primitives is that users themselves cannot perceive watermarks from different parallelisms, so they cannot align them. Similar requirements must be provided by the framework.

Processing Timer Service

Currently, Flink uses a unified abstraction for both Processing Time and Event Time. This is no longer the case in the new API. The main reasons are as follows:

  • Processing time only makes sense for real-time stream processing, while event time is meaningful for both real-time stream processing and historical batch processing. Even if a unified abstraction is adopted, users still need to understand and carefully deal with the differences between them. Otherwise, It is easy to produce unexpected behavior.
  • The advantage of unified abstraction is that users can easily switch between the two different time semantics without significantly changing the time-based processing logics. However, in reality, we are barely aware of any cases where the same processing logics are reused for both time semantics.

Therefore, in the new API, we only provide timer service for processing time. Event timer can be achieved by leveraging the watermark handling interfaces. The reason processing timer service is considered part of the fundamental primitives is that, it is required to work based on task's mailbox thread model, in order not to introduce concurrency issues.

High-Level Extensions

High-Level extensions are like short-cuts / sugars, without which users can probably still achieve the same behavior by working with the fundamental APIs, but would be a lot easier with the builtin supports. It includes Built-In Functions, Event Timer Service and Window.

Built-in Functions

ProcessFunction is a relatively low-level concept, and some common operations such as map, flatMap and reduce can be implemented based on it. However, for the convenience of users, we have provided built-in implementations of these common operations. Therefore, Built-In functions are high-level extension.

Event Time Supports

Event time supports no longer fundamental primitives as users can maintain it themselves via state and align it through generalized watermark. We see event-time as an extension that includes the following items:

  • A special type of generalized watermark with temporal semantics and an alignment algorithm for this watermark whose behavior remains consistent with before.

  • Some operators based on event time.

  • Some components to extract and handle event time: timestamp assigner, watermark generator...

Window

Window is a special processing mechanism for data on the stream: It split the stream into “buckets”, over which we can apply computations. Theoretically, window can be implemented on a process function by defining a series of states. Therefore, we consider it as a high-level extension.


Move common dependencies into separate module

Currently, many basic and common classes/tools in Flink are located in flink-core, such as state descriptor, type information,etc. Both new and old APIs depend on this, and it is also filled with a large amount of implementation related code, which violates the principle that new API not depending on implementation. 

To this end, we propose extracting the interfaces that the API needs to depend on into a new module, perhaps called  flink-core-api, and keeping the package path unchanged. And then making the core module depends on this core-api module. This will not break the compatibility of old API, but also allow the new API to depend solely on abstraction.

The dependency relationship between the API module and flink-core before and after this proposal is shown in the figure:

Umbrella FLIP is only intended to illustrate the proposed solution, so we do not want to list all the classes involved here. As for sub-FLIPs, the classes/interfaces that need to be moved to flink-core-api will be listed in detail.

Related sub-FLIPs

Since developing a new API is a relatively complex work, it is difficult to explain all the details in one FLIP. Therefore, we plan to split it into multiple sub-FLIPs for separate discussions.

DataStream, Partitioning and ProcessFunction of DataStream API V2

The connection between data stream, partitioning, and process function are relatively close, we will put them in a single FLIP. In this FLIP, we will focus on:

  • How to define different streams on DataStream API V2.

  • How to convert between different streams.

  • How to define operations on stream.

Config, Context and Processing Timer Service of DataStream API V2

In this FLIP, we will discuss the following parts:

  • How to configure the datastream, such as set parallelism.

  • How to get the information about the context in which process functions are executed.

  • How to get current processing time and register a processing timer.

State Access on DataStream API V2

The new API's support for state will also discussed in a separate FLIP, it will focus on how to define and access state in process function. It is also possible to discuss the further support of DataStream API V2 for the storage and computing disaggregated architecture of state.

Introduce Generalized Watermark

We will elaborate on all the details related to the generalized watermark in a separate FLIP. Its content includes:

  • What is generalized watermark

  • How to define a generalized watermark.

  • How the framework align and handle generalized watermark.

Built-In Functions of DataStream API V2

There are various built-in functions, which can be generally divided into two categories: stateful and stateless. We will discuss their implementations separately in two FLIPs.

In addition, due to the the concept and implementation of join is relatively complex, even though it is indeed a built-in function, we still want to discuss it separately. And because it is nearly based on window, we will put them both in a single FLIP. 

Introduce Execution Hint for Process Function

It's hard for people to understand which functions are batch-stream unified and which are batch / stream dedicated. We want to introduce a hint to identify the type of function in DataStream API V2. This not only helps users, but also helps the engine work better. For example, for batch only functions, more aggressively optimize can be done in runtime level.
We will discuss this section in a separate FLIP, mainly including:

  • Introducing execution hint mechanism for DataStream API V2.
  • How to mark this hint for built-in functions as well as user defined functions.
  • What other helpful hints can be provided in the future.

DataStream API V2's Support for Event Time

Event time is an important component of timely stream processing. It is worth discussing with a separate FLIP, especially since the new API no longer consider it as the fundamental semantics. This FLIP will focus on the following aspects:

  • How to Implement event time watermark via generalized watermark mechanism.

  • How to extract and process event time in DataStream API V2.

Window and Join on DataStream API V2.

Window and Join are two important operators, we will discuss them in a separate FLIP, mainly focus on:

  • How to implement Window and Join on the new API.

  • What improvements have been made compared to the counterpart on DataStream API V1.

Asynchronous processing on DataStream API V2

Asynchronous processing is a relatively complex topic, and we'll cover it in a separate FLIP. It mainly includes:

  • How to apply asynchronous logic to ProcessFunction.

  • How to deal with asynchronous calls that time out.

  • How does the asynchronous logic interact with the mailbox thread.


As a summary, we have the following sub-FLIPs in total:

  1. DataStream, Partitioning and Process Function of DataStream API V2

  2. Config, Context and Processing Timer Service of DataStream API V2

  3. State Access on DataStream API V2

  4. Introduce Generalized Watermark

  5. Stateless Built-In Functions of DataStream API V2

  6. Stateful Built-In Functions of DataStream API V2

  7. Introduce Execution Hint for Process Function

  8. DataStream API V2's Support for Event Time

  9. Window and Join on DataStream API V2

  10. Asynchronous processing on DataStream API V2

The dependency relationship between different sub-FLIPs is shown in the following figure:

Compatibility, Deprecation, and Migration Plan

As this is a completely new API, it is not compatible with the old DataStream API.

The evolution of the new API will go through the following process:

  1. Mark as experimental before all functions are merged.

  2. Mark as public evolving after two minor releases.

  3. Mark as public after two minor releases and deprecate the old API.

The removal of DataStream V1 needs to meet the following conditions at the same time:

  1. It was marked deprecated for at least two minor releases.
  2. Most users have already or can migrate to DataStream API V2.
  3. It can only be removed in major release.

This is an either in-place or smooth replacement of DataStream API. It will coexist with the old API for a considerable period of time. Once the old API is removed, all SDK-based jobs need to be migrated.

Test Plan

The corresponding test plan will be given in the sub-FLIPs.