You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Discussion thread
Vote thread
JIRA
ReleaseTBD

Introduction

As the first sub-FLIP for DataStream API V2, we'd like to discuss and try to answer some of the most fundamental questions in stream processing.

  1. What kinds of data streams do we have?
  2. How to partition data over the streams?
  3. How to define a processing on the data stream?

The answer to these questions involve three core concepts: DataStream, Partitioning and ProcessFunction. In this FLIP, we will discuss the definitions and related API primitives of these concepts in detail.

Concepts Definition

DataStream

DataStream is the carrier of data. Data flows on the stream and may be divided into multiple partitions. According to how the data is partitioned on the stream, we divide it into the following categories:

  • Global Stream: Force single partition/parallelism, and the correctness of data depends on this.

  • Partition Stream: 

    • Divide data into multiple partitions. State is only available within the partition. One partition can only be processed by one task, but one task can handle one or multiple partitions.

    • According to the partitioning approach, it can be further divided into the following two categories:

      • Keyed Partition Stream: Each key is a partition, and the partition to which the data belongs is deterministic.

      • Non-Keyed Partition Stream: Each parallelism is a partition, and the partition to which the data belongs is nondeterministic.

  • Broadcast Stream: Each partition contains the same data.

Partitioning

Above we defined the stream and how it is partitioned. The next topic to discuss is how to convert between different partition types. We call these transformations partitioning. For example non-keyed partition stream can be transformed to keyed partition stream via a `KeyBy` partitioning.

Overall, we have the following four partitioning:

  • KeyBy: Let all data be repartitioned according to specified key.

  • Shuffle: Repartition and shuffle all data.

  • Global: Merge all partitions into one.

  • Broadcast: Force partitions broadcast data to downstream.

The specific transformation relationship is shown in the following table:

Partitioning

Output

Global

Keyed

NonKeyed

Broadcast

Input

Global

KeyBy

Shuffle

Broadcast

Keyed

Global

KeyBy

Shuffle

Broadcast

NonKeyed

Global

KeyBy

Shuffle

Broadcast

Broadcast

(A crossed box indicates that it is not supported or not required)

One thing to note is: broadcast can only be used as side-input of other Inputs and cannot be directly converted to other streams.

ProcessFunction

Once we have the data stream, we can apply operations on it. The operations that can be performed over DataStream are collectively called Process Function.  It is the only entrypoint for defining all kinds of processings on the data streams.

Classification of ProcessFunction

According to the number of input / output, they are classified as follows:

Process Function

number of inputs

number of outputs

OneInputStreamProcessFunction

1

1

TwoInputStreamProcessFunction

2

1

TwoOutputStreamProcessFunction

1

2

Logically, process functions that support more inputs and outputs can be achieved  by combining them, but this implementation might be inefficient. If the call for this becomes louder, we will consider supporting as many output edges as we want through a mechanism like OutputTag. But this loses the explicit generic type information that comes with using ProcessFunction.

Advantages of ProcessFunction

Compared with DataStream V1, It has the following benefits:

  • Clearer definition: From the DataStream's perspective, it only needs to understand the semantics of functions. Built-in  operations such as map / flatMap / reduce / join can still be supported, but are decoupled from the core framework. That is to say, for DataStream V2, every operation is a process function .

  • Don't expose operators to users: We believe functions with access to proper runtime information and services are good enough for users to define custom data processing logics. Operators on the other hand are more an internal concept of Flink and users should not be allowed to directly use them. Besides, in V1 users are invited to extend  `AbstractStreamOperator` in order to define their custom operators, leading to unnecessary dependencies and unpredictable behaviors. In V2, users should define their custom behaviors by implementing interfaces rather than extending framework classes. 

Requirements for input and output streams

The following two tables list the input and output stream combinations supported by OneInputStreamProcessFunction and TwoOutputStreamProcessFunction respectively.

For OneInputStreamProcessFunction:

Input Stream

Output Stream

Global

Global

Keyed

Keyed / Non-Keyed

NonKeyed

NonKeyed

Broadcast

Not Supported

When KeyedPartitionStream is used as input, the output can be either a KeyedPartitionStream or NonKeyedPartitionStream. For general data processing logic, how to partition data is uncertain, we can only expect a NonKeyedPartitionStream. If we do need a deterministic partition, we can follow it with a KeyBy partitioning. However, there are times when we know for sure that the partition of records will not change before and after processing, shuffle cost due to the extra partitioning can be avoided. To be safe, in this case we ask for a KeySelector for the output data, and the framework checks at runtime to see if this invariant is broken. The same is true for two output and two input counterparts. For a more detailed explanation, see the API definition of KeyedPartitionStream in the Proposed Changes section below.


For TwoOutputStreamProcessFunction:

Input Stream

Output Stream

Global

Global  + Global

Keyed

Keyed + Keyed / Non-Keyed + Non-Keyed

NonKeyed

NonKeyed + NonKeyed

Broadcast

Not Supported

There are two points to note here:

  1. Broadcast stream cannot be used as a single input.
  2. Generally speaking, when a keyed stream as input, its output should be non-keyed stream, because the original partition maybe change during processing. But if we provide an specific KeySelector, its output can be keyed partitioned.


Things with two inputs is a little more complicated. The following table lists which streams are compatible with each other and the types of streams they output. 

A cross(❎) indicates not supported.

Output

Input2

Global

Keyed

NonKeyed

Broadcast

Input1

Global

Global

Keyed

Non-Keyed / Keyed

Non-Keyed

NonKeyed

Non-Keyed

Non-Keyed

Broadcast

Non-Keyed

Non-Keyed

  1. The reason why the connection between Global Stream and Non-Global Stream is not supported is that the number of partitions of GlobalStream is forced to be 1, but it is generally not 1 for Non-Global Stream, which will cause conflicts when determining the number of partitions of the output stream. If necessary, they should be transformed into mutually compatible streams and then connected.
  2. Connecting two broadcast streams doesn't really make sense, because each parallelism would have exactly same input data from both streams and any process would be duplicated. 
  3. The reason why the output of two keyed partition streams can be keyed or non-keyed is the same as we mentioned above in the case of single input.
  4. When we connect two KeyedPartitioinStream, they must have the same key type, otherwise we can't decide how to merge the partitions of the two streams. At the same time, things like access state and register timer are also restricted to the partition itself, cross-partition interaction is not meaningful.
  5. The reasons why the connection between KeyedPartitionStream and NonKeyedPartitionStream is not supported are as follows:
    1. The data on KeyedStream is deterministic, but on NonKeyed is not. It is difficult to think of a scenario where the two need to be connected.
    2. This will complicate the state declaration and access rules. A more detailed discussion can be seen in the subsequent state-related sub-FLIP.
    3. If we see that most people have clear demands for this, we can support it in the future.

Lifecycle of Process Function

A Process Function goes through the following phases:

  • Open: The preparation phase before process function starts processing data. It corresponds to the open phase of the underlying operator.

  • Process: The process function is already processing data and will continuously execute the corresponding data processing logic.

  • EndInput:  An input of the process function no longer sends new data. For functions with multiple inputs, this life cycle will go through multiple times until all inputs no longer generate data.

  • Close: The process function no longer processes any data and corresponds to the close phase of the underlying operator.

For each life cycle, process function will provide corresponding hooks to execute user-defined callback logic. We will elaborate on these life-cycle hooks in the following proposed changes section.


Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

  • No labels