Introduction
The DataStream API is one of the two main APIs that Flink provides for writing data processing programs. As an API that was introduced practically since day-1 of the project and has been evolved for nearly a decade, we are observing more and more problems of it. Improvements on these problems require significant breaking changes, which makes in-place refactor impractical. Therefore, we propose to introduce a new set of APIs, the DataStream API V2, to gradually replace the original DataStream API.
The proposal to introduce a whole set new API is complex and includes massive changes. We are planning to break it down into multiple sub-FLIPs for incremental discussion. This FLIP is only used as an umbrella, mainly focusing on motivation, goals, and overall planning. That is to say, more design and implementation details will be discussed in other FLIPs.
In order to make incremental progress along this direction, we propose to vote the umbrella and sub-FLIPs separately. But we are also aware of that the decision making for some of these FLIPs may depend on the others. Therefore, we are also open to other opinions and suggestions.
Problems of DataStream API
A good API design can help not only users, but also developers. Through some research, we have come up with the following principles:
Shouldn't reveal implementation details
Easy to understand and use
Clear and unambiguous definition
Make it as flexible as possible
Keep it compatible
The DataStream API V1 already does some of these things well, but we still see some problems that conflict with these principles.
Exposure of Internals
DataStream API exposes many internal concepts, implementation details to users. This breaks our principle of not revealing implementation details, easy to understand and use, keep it compatible. Some of which were not intended, some were intended at beginning but should be changed now.
Some non-API types (with @Internalannotation, or without any annotation) are used for arguments / return-values of APIs. E.g., the @InternaltypeStreamRecordis used for the @PublicEvolvingAPI `StreamOperator#setKeyContextElement1/setKeyContextElement2`.
User programs and runtime codes are depending on the same concrete classes, which means methods that are intended to only be used for runtime codes can also be accessed by user programs, despite annotated as @Internalor not. E.g., `DataStream#getTransformation`.
Many APIs are designed to let users extends an abstract / concrete class. This is problematic because user classes can access the internals of the super class and override its behaviors, leading to unexpectable problems. E.g., AbstractStreamOperator.
StreamOperator, which is currently a @PublicEvolvingAPI, is closely coupled with many runtime internal concepts: checkpoints / snapshots, key context, containing task, etc. These concepts are unnecessarily exposed to users.
As a consequence, many internal changes may unnecessarily break backwards compatibility, which makes maintenance and evolving of Flink runtime hard and complex.
Concepts and Primitives Lack Organization
Currently, DataStream API provides primitives that corresponds to concepts of many different levels. This makes it difficult to maintain consistency across different interfaces and breaks our principle of make it as flexible as possible and easy to understand and use. The main reasons are as follows:
It contains the most fundamental concepts that defines a stateful stream processing application: data streams, transforms, states, etc.
It also allows defining arbitrary transforms via process functions and custom operators.
It provides high-level implementations and utilities for common data processing patterns: map, aggregate, join, window, etc.
There are also short-cuts for specific source / sink implementations, mainly for testing purpose.
All these primitives are squashed into a single and flat DataStream layer, without clear modularization and organization. This makes it hard for users, especially beginners, to understand and use.
Obscure Stream / Batch Semantics
We have been saying that DataStream API is stream-batch unified. But that doesn't mean everything in DataStream API is stream-batch unified. This breaks our principle of clear and unambiguous definition and easy to understand and use. For example, processing time only makes sense in real-time stream processing, while caching of intermediate results is only supported for batch processing. In addition, DataStream API does not support some batch specific operations (such as sort) because it is hard to define the behavior in unbounded stream scenarios. It's hard for people to understand which APIs are batch-stream unified and which are batch / stream dedicated.
Public Interfaces
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
A public interface is any change to the following:
Binary log formatThe network protocol and api behaviorAny class in the public packages under clientsConfiguration, especially client configurationorg/apache/kafka/common/serializationorg/apache/kafka/commonorg/apache/kafka/common/errorsorg/apache/kafka/clients/producerorg/apache/kafka/clients/consumer (eventually, once stable)
MonitoringCommand line tools and argumentsAnything else that will likely break existing users in some way when they upgrade
Proposed Changes
Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
Compatibility, Deprecation, and Migration Plan
As this is a completely new API, it is not compatible with the old DataStream API.
The evolution of the new API will go through the following process:
Mark as experimental before all functions are merged.
Mark as public evolving after two minor releases.
Mark as public after two minor releases and deprecate the old API.
The removal of DataStream V1 needs to meet the following conditions at the same time:
‒ It was marked deprecated for at least two minor releases.
‒ Most users have already or can migrate to DataStream API V2.
‒ It can only be removed in major release.
This is an either in-place or smooth replacement of DataStream API. It will coexist with the old API for a considerable period of time. Once the old API is removed, all SDK-based jobs need to be migrated.
Test Plan
The corresponding test plan will be given in the sub-FLIPs.