Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties



...

In order to make incremental progress along this direction, we propose to vote the umbrella and sub-FLIPs separately. But we are also aware of that the decision making for some of these FLIPs may depend on the others. Therefore, we are also open to other opinions and suggestions.

Problems of DataStream API

A good API design can help not only users, but also developers. Through some research, we have come up with the following principles:

  1. Shouldn't reveal implementation details

  2. Easy to understand and use

  3. Clear and unambiguous definition

  4. Make it as flexible as possible

  5. Keep it compatible

The DataStream API V1 already does some of these things well, but we still see some problems that conflict with these principles.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Exposure of Internals

DataStream API exposes many internal concepts, implementation details to users. This breaks our principle of not revealing implementation details, easy to understand and use, keep it compatible. Some of which were not intended, some were intended at beginning but should be changed now.

  • Some non-API types (with @Internalannotation, or without any annotation) are used for arguments / return-values of APIs. E.g., the @InternaltypeStreamRecordis used for the @PublicEvolvingAPI `StreamOperator#setKeyContextElement1/setKeyContextElement2`.
  • User programs and runtime codes are depending on the same concrete classes, which means methods that are intended to only be used for runtime codes can also be accessed by user programs, despite annotated as @Internalor not. E.g., `DataStream#getTransformation`.
  • Many APIs are designed to let users extends an abstract / concrete class. This is problematic because user classes can access the internals of the super class and override its behaviors, leading to unexpectable problems. E.g., AbstractStreamOperator.
  • StreamOperator, which is currently a @PublicEvolvingAPI, is closely coupled with many runtime internal concepts: checkpoints / snapshots, key context, containing task, etc. These concepts are unnecessarily exposed to users.

As a consequence, many internal changes may unnecessarily break backwards compatibility, which makes maintenance and evolving of Flink runtime hard and complex.

Concepts and Primitives Lack Organization

Currently, DataStream API provides primitives that corresponds to concepts of many different levels. This makes it difficult to maintain consistency across different interfaces and breaks our principle of make it as flexible as possible and easy to understand and use. The main reasons are as follows:

  • It contains the most fundamental concepts that defines a stateful stream processing application: data streams, transforms, states, etc.

  • It also allows defining arbitrary transforms via process functions and custom operators.

  • It provides high-level implementations and utilities for common data processing patterns: map, aggregate, join, window, etc.

  • There are also short-cuts for specific source / sink implementations, mainly for testing purpose.

All these primitives are squashed into a single and flat DataStream layer, without clear modularization and organization. This makes it hard for users, especially beginners, to understand and use.

Obscure Stream / Batch Semantics

We have been saying that DataStream API is stream-batch unified. But that doesn't mean everything in DataStream API is stream-batch unified. This breaks our principle of clear and unambiguous definition and easy to understand and useFor example, processing time only makes sense in real-time stream processing, while caching of intermediate results is only supported for batch processing. In addition, DataStream API does not support some batch specific operations (such as sort) because it is hard to define the behavior in unbounded stream scenarios. It's hard for people to understand which APIs are batch-stream unified and which are batch / stream dedicated.

Proposed Plan

To address the above issues, we want to introduce another set of API,DataStream API V2, and help users to migrate from the current DataStream API (hereinafter referred to as "V1") smoothly. That is to say, once the DataStream API V2 is ready, the V1 will not be immediately removed, but will co-exist with V2 for a reasonable migration period.

Design Goals

Through this new API, we aim to achieve the following goals:

  • Re-organize the concepts: 

    • Separate fundamental and high-level concepts: Define the most primitive parts required for the API clearly, and separate different sets of high-level supports like batch-stream unified functions, window... etc.

    • Improve the definitions and primitives.

  • User only depends on pure APIs: Providing a thin and pure APIs layer. Both the user's job and flink-runtime will depends on this API layer. This eliminates the need for user jobs to (indirectly) depend on runtime internals.

  • Eliminate internal-exposing apis / contents.

Scope and Principles

Introducing a new API is a huge topic, so let's first clarify the scope and principles of this proposed API.

  • Only focusing on APIs: The Internal implementations can reused previous work(e.g. transformations and operations) as much as possible as a first step, and refactored later in a compatible and user-not-aware way . This can make the discussion more focused on API semantics. Besides, some refactoring work may be easier if done after removing DataStream V1 .
  • Incremental additions: Datastream API V1 supports a wide range of features. Our plan is to incrementally add supports for these features in V2. Therefore, you may find many features are missing in the early sub-FLIPs. Eventually, it is required for V2 to have at least equivalent functionalities as V1 before deprecating and removing V1, except for functionalities that are determined unwanted.
  • APIs are initially experimental: By convention, @Experimental APIs should be promoted to @PublicEvolving in 2 minor releases. But as we are developing incrementally, the whole set of API would take multiple release cycles to complete. Therefore, we proposed not to start counting the promoting period for the early-merged APIs, until the whole API set is completed.

Proposed Changes

Based on whether its functionality must be provided by the framework, we divide the relevant concepts into two categories: fundamental primitives and high-Level extensions.

Fundamental primitives

Fundamental primitives are the basic and necessary semantics  that the framework need to provide in order to define a stateful stream processing application, which cannot be  achieved by users  if not provided by the framework. It includes DataStream, Partitioning, ProcessFuction, Async Processing, State, Processing Timer Service and (Generalized) Watermark.

DataStream, Partitioning, ProcessFunction and State are the most fundamental elements of new API and respectively represent:

  • What are the types of data streams

  • How data is partitioned 

  • How to perform operations / processing on data streams

  • How to define and access state on data streams

Obviously, they are the fundamental primitives as they must be provided by the framework. As for Async Processing, (Generalized) Watermark and Processing Timer Service, we need to further elaborate.

Async Processing

Flink performs user functions in a synchronous manner, but some operations need to be performed asynchronously. For example, I/O access is a time-consuming process, making the TPS for single operator much lower than in-memory computing, particularly for streaming job, when low latency is a big concern for users. 

Even if users could start other threads to perform these time-consuming operations, they would still need to notify the mailbox thread to send data when the function produces output, or to perform a piece of callback logic. Asynchronous processing is therefore the fundamental primitive provided by the framework.

(Generalized) Watermark

Watermark is driving the progress of event time in Flink. It maintains time consistency via propagation and alignment on the stream and operators. 

This similar characteristic (propagation and alignment) are actually widely present: for example, stream events like EndOfPartition and RecordAttributes proposed by FLIP-309. It is worth mentioning that we have also seen users calling for mechanism related to this in the mail-list, for example, control flow and dynamic CEP. This inspires us to abstract a more general concept that is no longer limited to temporal semantics called Generalized Watermark.

Generalized Watermark represents a special event / signal that can be emitted from source or other operators, received by downstream operators, and aligned across parallel streams during propagation. With such abstraction, the current event-time watermark becomes one specific implementation of the generalized watermarks, for which we can still provide built-in support.

The reason why we consider it as the fundamental primitives is that users themselves cannot perceive watermarks from different parallelisms, so they cannot align them. Similar requirements must be provided by the framework.

Processing Timer Service

Currently, Flink uses a unified abstraction for both Processing Time and Event Time. This is no longer the case in the new API. The main reasons are as follows:

  • Processing time only makes sense for real-time stream processing, while event time is meaningful for both real-time stream processing and historical batch processing. Even if a unified abstraction is adopted, users still need to understand and carefully deal with the differences between them. Otherwise, It is easy to produce unexpected behavior.
  • The advantage of unified abstraction is that users can easily switch between the two different time semantics without significantly changing the time-based processing logics. However, in reality, we are barely aware of any cases where the same processing logics are reused for both time semantics.

Therefore, in the new API, we only provide timer service for processing time. Event timer can be achieved by leveraging the watermark handling interfaces. The reason processing timer service is considered part of the fundamental primitives is that, it is required to work based on task's mailbox thread model, in order not to introduce concurrency issues.

High-Level Extensions

High-Level extensions are like short-cuts / sugars, without which users can probably still achieve the same behavior by working with the fundamental APIs, but would be a lot easier with the builtin supports. It includes Built-In Functions, Event Timer Service and Window.

Built-in Functions

ProcessFunction is a relatively low-level concept, and some common operations such as map, flatMap and reduce can be implemented based on it. However, for the convenience of users, we have provided built-in implementations of these common operations. Therefore, Built-In functions are high-level extension.

Event Time Supports

Event time supports no longer fundamental primitives as users can maintain it themselves via state and align it through generalized watermark. We see event-time as an extension that includes the following items:

  • A special type of generalized watermark with temporal semantics and an alignment algorithm for this watermark whose behavior remains consistent with before.

  • Some operators based on event time.

  • Some components to extract and handle event time: timestamp assigner, watermark generator...

Window

Window is a special processing mechanism for data on the stream: It split the stream into “buckets”, over which we can apply computations. Theoretically, window can be implemented on a process function by defining a series of states. Therefore, we consider it as a high-level extension.


Move common dependencies into separate module

Currently, many basic and common classes/tools in Flink are located in flink-core, such as state descriptor, type information,etc. Both new and old APIs depend on this, and it is also filled with a large amount of implementation related code, which violates the principle that new API not depending on implementation. 

To this end, we propose extracting the interfaces that the API needs to depend on into a new module, perhaps called  flink-core-api, and keeping the package path unchanged. And then making the core module depends on this core-api module. This will not break the compatibility of old API, but also allow the new API to depend solely on abstraction.

The dependency relationship between the API module and flink-core before and after this proposal is shown in the figure:

Image Added

Umbrella FLIP is only intended to illustrate the proposed solution, so we do not want to list all the classes involved here. As for sub-FLIPs, the classes/interfaces that need to be moved to flink-core-api will be listed in detail.

Related sub-FLIPs

Since developing a new API is a relatively complex work, it is difficult to explain all the details in one FLIP. Therefore, we plan to split it into multiple sub-FLIPs for separate discussions.

DataStream, Partitioning and ProcessFunction of DataStream API V2

The connection between data stream, partitioning, and process function are relatively close, we will put them in a single FLIP. In this FLIP, we will focus on:

  • How to define different streams on DataStream API V2.

  • How to convert between different streams.

  • How to define operations on stream.

Config, Context and Processing Timer Service of DataStream API V2

In this FLIP, we will discuss the following parts:

  • How to configure the datastream, such as set parallelism.

  • How to get the information about the context in which process functions are executed.

  • How to get current processing time and register a processing timer.

State Access on DataStream API V2

The new API's support for state will also discussed in a separate FLIP, it will focus on how to define and access state in process function. It is also possible to discuss the further support of DataStream API V2 for the storage and computing disaggregated architecture of state.

Introduce Generalized Watermark

We will elaborate on all the details related to the generalized watermark in a separate FLIP. Its content includes:

  • What is generalized watermark

  • How to define a generalized watermark.

  • How the framework align and handle generalized watermark.

Built-In Functions of DataStream API V2

There are various built-in functions, which can be generally divided into two categories: stateful and stateless. We will discuss their implementations separately in two FLIPs.

In addition, due to the the concept and implementation of join is relatively complex, even though it is indeed a built-in function, we still want to discuss it separately. And because it is nearly based on window, we will put them both in a single FLIP. 

Introduce Execution Hint for Process Function

It's hard for people to understand which functions are batch-stream unified and which are batch / stream dedicated. We want to introduce a hint to identify the type of function in DataStream API V2. This not only helps users, but also helps the engine work better. For example, for batch only functions, more aggressively optimize can be done in runtime level.
We will discuss this section in a separate FLIP, mainly including:

  • Introducing execution hint mechanism for DataStream API V2.
  • How to mark this hint for built-in functions as well as user defined functions.
  • What other helpful hints can be provided in the future.

DataStream API V2's Support for Event Time

Event time is an important component of timely stream processing. It is worth discussing with a separate FLIP, especially since the new API no longer consider it as the fundamental semantics. This FLIP will focus on the following aspects:

  • How to Implement event time watermark via generalized watermark mechanism.

  • How to extract and process event time in DataStream API V2.

Window and Join on DataStream API V2.

Window and Join are two important operators, we will discuss them in a separate FLIP, mainly focus on:

  • How to implement Window and Join on the new API.

  • What improvements have been made compared to the counterpart on DataStream API V1.

Asynchronous processing on DataStream API V2

Asynchronous processing is a relatively complex topic, and we'll cover it in a separate FLIP. It mainly includes:

  • How to apply asynchronous logic to ProcessFunction.

  • How to deal with asynchronous calls that time out.

  • How does the asynchronous logic interact with the mailbox thread.


As a summary, we have the following sub-FLIPs in total:

  1. DataStream, Partitioning and Process Function of DataStream API V2

  2. Config, Context and Processing Timer Service of DataStream API V2

  3. State Access on DataStream API V2

  4. Introduce Generalized Watermark

  5. Stateless Built-In Functions of DataStream API V2

  6. Stateful Built-In Functions of DataStream API V2

  7. Introduce Execution Hint for Process Function

  8. DataStream API V2's Support for Event Time

  9. Window and Join on DataStream API V2

  10. Asynchronous processing on DataStream API V2

The dependency relationship between different sub-FLIPs is shown in the following figure:

Image Added

Compatibility, Deprecation, and Migration Plan

...

The removal of DataStream V1 needs to meet the following conditions at the same time:

‒ It was marked deprecated for at least two minor releases.

‒ Most users have already or can migrate to DataStream API V2.

...

  1. It was marked deprecated for at least two minor releases.
  2. Most users have already or can migrate to DataStream API V2.
  3. It can only be removed in major release.

This is an either in-place or smooth replacement of DataStream API. It will coexist with the old API for a considerable period of time. Once the old API is removed, all SDK-based jobs need to be migrated.

...