Authors: Shuiqiang Chen, Hequn Cheng, Jincheng Sun
Status
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, SQL and Table API have been supported in pyflink, which provides much more convenience for users who are familiar with python programming language. However, users might request for more complicated processing operations which might need to access state and timer, etc. And for AI and ML engineers, there might not be strong demand for structural data query. In this FLIP, we will introduce the python DataStream API which can make up for the above shortcomings.
Goals
- support python DataStream to read/write data from/to external storage(connectors).
- support configuring task and job configuration( get/set resources, parallelism, chaining strategy , etc.)
- support stateless data transformations, including map, flatmap, keyby, etc.
NoneGoals
- The support for stateful data transformations, including ProcessFunction, RichFunction, Window, Join function will not be considered in this FLIP, but we have the overall design in this doc.
Proposed Changes
Archetecture
In FLIP 38 Python Table API, the communication between Python VM process and JVM process is implemented by py4j RPC framework, by which there will only be wrapper classes for all of java API and have no need to design a new set of python APIs. And in FLIP-58 Flink Python User-Defined Stateless Function for Table , we leverage the Beam framework to make the flink operator start a Beam Runner to launch a python process to run the python user defined function. The overall architecture is as below:
We could apply the same architecture to support python DataStream APIs(such as DataStream, KeyedDataStream, MapFunction, ProcessFunction, etc.) by adding the corresponding wrapper classes for Java DataStream APIs in python. Some simple interface will be as below:
class DataStream(object):
def __init__(self, j_data_stream): self._j_data_stream = j_data_stream def get_parallelism(): return self._j_data_stream.getParallelism() |
And there are a lot of interfaces that require user to implement processing logic as a function like DataStream.map/flatmap/keyBy(). Take map() function for instance, user implements a function logic in python language, then passes it to the map() function:
def my_map_func(value): return value + 1 ds.map(my_map_func) |
We could also make use of the Beam framework to execute the user defined python processing function.
Implementation
In summary, the architure of python DataStream API has no much difference with SQL and Table API. But there are some more implementation detail to be mentioned.
Configuration
Task Configuration
DataStream API provides APIs to get or set specific configuration for current Stream. Python DataStream API would also privode corresponding API in the form of a wrapped up class which would send the get/set requests to Java API instance via py4j rpc framework. Take DataStream for instance:
class DataStream(object): def __init__(self, j_data_stream): self._j_data_stream = j_data_stream def set_parallelism(parallelism): self._j_data_stream.setParallelism(parallelism) return self
def set_resources(self, resource_spec): self._j_data_stream.getTransformation().setResources(resource_spec._j_resource_spec) return self
def slot_sharing_group(self, slot_sharing_group) self._j_data_stream.slotSharingGroup(slot_sharing_group) return self |
In the code block above, set_resource_spec(ResourceSpec) require user to pass ResourceSpec instance as parameters, this means the python DataStream API also need to provide the corresponding wrapped python classes.
Dependency management
When developing pyflink application, there might be some module consist of multiple files, or other third party dependencies. We also need to provide interfaces for user to add up these extract files in StreamExecutionEnvironment as configurations. The interfaces would be as bellow:
class StreamExectutionEnvironment(object): def add_python_file(self, file_path): pass
def set_python_requirements(self, requirments_file_path, requirements_cache_dir=None): pass
def add_python_archive(self, archive_path, target_dir=None): pass |
When submitting a python flink data stream job through CLI Frontend, user can also use the following options which have already been supported currently to specify dependencies:
- pyfs: attach custom python files for job.
- pyarch: add python archive files for job.
- pyreq: specify a requirements.txt file which defines the third-party dependencies.
- pym: specify python module with the program entry point.
Stateless Functions
There are many transform operations in DataStream API require user to provide their implementation, such as map()/flatmap()/keyBy(), etc. In python DataStream API, users can implement their processing logic in a python function and pass it to the interface. Take map() function for instance:
# implement a MapFunction class MyMapFunction(MapFunction): def map(self, value): return value + 1 ds.map(MyMapFunction()) #a lambda function ds.map(lambda x: x+1) #directly pass a func def map_func(value): ds.map(map_func) |
Users has three approaches to defined their functions:
- implement a MapFunction provided by python DataStream API;
- pass a lambda function to the interface;
- directly pass a function.
As mentioned above, python DataStream API leverages the Beam portability framework to be able to execute user defined python function in flink runtime. When user gives a python function as a parameter to a datastream transformation interface, we will serialize the function object and pass it to a dedicated flink java operator which will launch a Beam runner and start a python process to execute user defined python function. Therefore, the submitted job graph might not be as the user specified logic.
Types and SerDe
In the last section about supporting UDFs, we mentioned that python DataStream API can reuse the Python Table API framework. But there are some differences between them. SQL and Table API data formats are restricted by table Schema, so data between upstream and downstream operation can be SerDe by the given type information in Schema. While DataStream does not have a strong data schema restriction that user might not care about the input and output data types when implementing their python functions. However, we need to serialize and deserialize data and ship data between operators correctly. In pyflink DataStream API, we tease out the following four scenarios:
- After executing a python function operation in upstream, there might be a java operation which has specified input and output type information. In this case, the upstream python operation needs to be aware of its output type information either by user specified data types or be inferred from the downstream operator so that the beam python harness can encode result data applying value coders corresponding to the InputTypeInformation of the next operator.
- When upstream and downstream both are python operations, data will be transmitted in the form of primitive byte arrays between operators. And the result data from python harness will be serialized/deserialized by pickled serializer. Therefore, we would introduce a new type information called PickledByteArrayTypeInformation to indicate that the data is in pickled primitive byte array format.
- As for the 3rd and 4th scenario, the downstream operation can obtain type information of upstream java operation from inputs.
According to the illustration above, we mainly focus on the first and second scenarios. We would introduce a python class named Types to give a mapping for python types and java TypeInformations:
class Types(object): """ This class gives access to the type information of the most common types for which Flink has built-in serializers and comparators. """ VOID = BasicTypeInfo.VOID_TYPE_INFO STRING = BasicTypeInfo.STRING_TYPE_INFO BYTE = BasicTypeInfo.BYTE_TYPE_INFO BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO SHORT = BasicTypeInfo.SHORT_TYPE_INFO INT = BasicTypeInfo.INT_TYPE_INFO LONG = BasicTypeInfo.LONG_TYPE_INFO FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO CHAR = BasicTypeInfo.CHAR_TYPE_INFO BIG_INT = BasicTypeInfo.BIG_INT_TYPE_INFO BIG_DEC = BasicTypeInfo.BIG_DEC_TYPE_INFO SQL_DATE = SqlTimeTypeInfo.DATE SQL_TIME = SqlTimeTypeInfo.TIME SQL_TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP LOCAL_DATE = LocalTimeTypeInfo.LOCAL_DATE LOCAL_TIME = LocalTimeTypeInfo.LOCAL_TIME LOCAL_DATE_TIME = LocalTimeTypeInfo.LOCAL_DATE_TIME INSTANT = BasicTypeInfo.INSTANT_TYPE_INFO @staticmethod def ROW(types): """ Returns type information for Row with fields of the given types. A row itself must not benull. :param types the types of the row fields, e.g., Types.String(), Types.INT() """ return RowTypeInfo(types) @staticmethod def ROW_NAMED(names, types): """ Returns type information for Row with fields of the given types and with given names. A row must not be null. :param names array of field names. :param types array of field types. """ return RowTypeInfo(types, names) @staticmethod def PRIMITIVE_ARRAY(element_type): """ Returns type information for arrays of primitive type (such as byte[]). The array must not be null. :param element_type element type of the array (e.g. Types.BOOLEAN(), Types.INT(), Types.DOUBLE()) """ if element_type == Types.BOOLEAN: return PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO() elif element_type == Types.BYTE: return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO() elif element_type == Types.SHORT: return PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO() elif element_type == Types.INT: return PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO() elif element_type == Types.LONG: return PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO() elif element_type == Types.FLOAT: return PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO() elif element_type == Types.DOUBLE: return PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO() elif element_type == Types.CHAR: return PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO() else: raise TypeError("Invalid element type for a primitive array.") |
Then user can specify output data types when implementing their python functions, take map() for instance:
data_stream.map(MyMapFunction(), data_types=Types.ROW([Types.String(), Types.INT()])) |
Connectors
Furthermore, it is important to provide a connector API to read/write data from python objects or external storage systems.
Source
User can get a data stream from two kinds of sources:
- from python objects by calling from_collection(), etc.
- from an external data source by calling add_source() to add a connector. Currently, only built-in connectors are supported, such as Kafka, Cassandra, Kinesis, ElasticSearch, HDFS, RabbitMQ, Apache NiFi, Twitter Streaming API, Google PubSub, JDBC. We need to provide the corresponding wrapped classes in Java API. Take kafka connector for instance:
Java code:
Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); DataStream<String> stream = env .addSource(new FlinkKafkaConsumer010<>("topic", new SimpleStringSchema(), properties)); |
in Python DataStream, users can add a kafka data source as below:
class FlinkKafkaConsumer010(object): def __init__(self, topic, deserialization_schema, properties): pass properites = {} properties[‘bootstrap.servers’] = ‘localhost:9092’ properties[‘group.id’] = ‘test’ ds = env.add_source(FlinkKafkaConsumer010(‘my_topic’, SimpleStringSchema(), properties)) |
It requires the user to provide a DeserializationSchema which will be a wrapped class of Java API. Currently, we will only support some built-in simple DeserializationSchemas, such as:
- SimpleStringSchema: Deserialize the bytes array data to a String object;
- JsonRowDeserializationSchema: Deserialize the input json format data to a Row;
- AvroRowDeserializationSchema: Deserialize the input avro format data to a Row;
- CsvRowDeserializationSchema: Deserialize the input csv format data to a Row;
Take JsonRowDeserializationSchema for instance, at java side, users can get a deserializationSchema instance like the codes below:
TypeInformation<Row> rowSchema = Types.ROW_NAMED( new String[] {"f1", "f2", "f3", "f4", "f5"}, Types.INT, Types.BOOLEAN, Types.STRING, Types.SQL_TIMESTAMP, Types.LOCAL_DATE_TIME); JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema.Builder(rowSchema).build(); |
In pyflink DataStream API, we would make it in a similar approach to declare a row schema:
row_schema = Types.ROW_NAMED( ["f1", "f2", "f3", "f4", "f5"], [Types.INT, Types.BOOLEAN, Types.STRING, Types.SQL_TIMESTAMP, Types.LOCAL_DATE_TIME]) deserialization_schema = JsonRowDeserializationSchema.Builder(rowSchema).build() env.add_source(FlinkKafkaConsumer010("topic", deserialization_schema, properties)) |
Sink
DataStream API supports writing data into a local file or external storage system:
- Call DataStream.print() to print the data stream to the console.
- call DataStream.add_sink() to add a sink connector, similar to source connectors,we will provide a wrapped class correspond to the java connector and require user to provide a DeserializationSchema, the kafka producer sink connector would be like as below:
row_schema = Types.ROW_NAMED( ["f1", "f2", "f3", "f4", "f5"], [Types.INT, Types.BOOLEAN, Types.STRING, Types.SQL_TIMESTAMP, Types.LOCAL_DATE_TIME]) serialization_schema = JsonRowSerializationSchema.Builder(rowSchema).build() ds.add_sink(FlinkKafkaProducer010(‘topic’, serialization_schema, properties)) |
Public Interfaces
DataStream
First, we will add a new class named DataStream, and most of the methods corresponding to the DataStream API at java/scala side. And base on DataStream, after specific transformations, there are also several DataStream extensions to be mentioned:
KeyedStream: represents a DataStream on which the operator state is partitioned by key using a provided KeySelector.
IterativeStream: represents the start of an iteration in a DataStream.
The hierarchy diagram is as below:
class DataStream(object): """ A DataStream represents a stream of elements of the same type. A DataStream can be transformed into another DataStream by applying a transformation as for example: DataStream.map() DataStream.filter() """ def get_parallelism(self): """ Gets the parallelism for this operator. """ pass def get_min_resources(self): """ Gets the minimum resources for this operator. """ pass def get_preferred_resources(self): Gets the preferred resources for this operator. """ pass
def get_type(self): """ Gets the type of the stream. """ pass def get_execution_environment(self): """ Returns the StreamExecutionEnvironment that was used to create this DataStream. """ pass def get_execution_config(self): pass
def name(self, name): pass
def get_name(self): pass def uid(self): pass def set_uid_hash(self, hash): pass def disable_chaining(self): pass def start_new_chain(self): def slot_sharing_group(self, slot_sharing_group): pass def union(self, data_streams) """ Creates a new DataStream by merging DataStream outputs of the same type with each other. The DataStream merged using this operator will be transformed simultaneously. """ pass def connect(self, data_stream) pass
def key_by(self, key_selector): pass def broadcast(self): pass
def shuffle(self): pass
def forward(self): pass
def rebalance(self): pass
def rescale(self): pass
def global(self): pass
def iterate(self): pass
def map(self, map_func): pass
def flat_map(self, flat_map_func): pass
def filter(self, filter_func): pass
def add_sink(self, sink_function): pass
def print(self) pass def partition_custom(self) pass
|
class KeyedStream(DataStream): """ A KeyedStream represents a DataStream on which operator state is partitioned by key using a provided KeySelector. Typical operations supported by a DataStream are also possinble on a KeyedStream, with the exception of partitioning methods such as shuffle, forward and key_by. Reduce-style operations, such as reduce, sum work on elements that have the same key. """
def reduce(self, reducer): """ Applies a reduce transformation on the grouped data stream grouped on by the given key position. The ReduceFunction will receive input values based on the key value. Only input values with the same key will go to the same reducer """ pass def
... |
It is also important to add up job configuration and execution interfaces in StreamExecutionEnvironment( set_parallelism(), execute_async(), etc.) and all supported methods to get a DataStream from python object collection or external data source:
class StreamExecutionEnvironment(object):
def from_collection(self, collection) -> DataStream: pass def read_text_file(self, file_path) → DataStream: pass
def generate_sequence(self, from, to) -> DataStream; pass
def add_source(self, source_function) -> DataStream: pass
def execute_async(self): pass
def register_cache_file(self, file_path, file_name, executable): pass
def get_cached_files(self): pass |
StreamTableEnvironment
In Java Table API, users can get a Table from a DataStream or return a DataStream from a table, it should also be supported in python stream table environment :
class StreamTableEnvironment(TableEnvironment):
def from_data_stream(self, data_stream, names=None): pass
def to_append_stream(self): pass
def to_retract_stream(self): |
Function/MapFunction/FlatMapFunction
class Function(abc.ABC): """ The base class for all user-defined functions. """ class MapFunction(ScalarFunction):
@abc.abstractmethod def map(self, value): pass class FlatMapFunction(ScalarFunction):
@abc.abstractmethod def flat_map(self, value): pass
class FilterFunction(ScalarFunction):
@abc.abstractmethod def filter(self, value): pass class ReduceFunction(ScalarFunction):
@abc.abstractmethod def reduce(self, value_1, value2): pass |
Example
env = StreamExecutionEnvironment.get_execution_environment() #create a DataStream from a python object list ds = env.from_collection([('ab', 1), ('abc, 2), ('abcd', 3), ('abcde', 4)], type_info=Types.ROW([Types.STRING(), Types.INT()]) #set the parallelism of this operator to 3 ds.set_parallelism(3) # keyed the DataStream with the second field of the record, then # transform the DataStream with a map function, finally, print the # transformation result to console. ds.key_by(lambda x: x[1]).map(lambda x: (x[0], x[1] + 1)).print() env.execute() |
To sum up, we have the following function list to be implemented:
category | functionality |
Env | StreamExecutionEnvironment |
configuration relevant method | |
StreamExecutionEnvironment(final Configuration configuration) | |
executeAsync | |
registerCachedFile/context.getCachedFile | |
Predefined Source | generateSequence |
fromCollection | |
addSource | |
PredefinedSink | print() |
addSink | |
DataStreamUtils(collect, reinterpretAsKeyedStream) | |
StreamingFileSink | |
Connectors | Kafka |
Cassandra | |
Kinesis | |
Elasticsearch | |
Hadoop FileSystem | |
RabbitMQ | |
Apache NiFi | |
Twitter Streaming API | |
Google PubSub | |
JDBC | |
DataStream Operators | keyby |
reduce | |
union | |
project | |
Partitioning | shuffle |
rebalance | |
rescale | |
broadcast | |
Chaining & resource groups | startNewChain |
disableChaining | |
slotSharingGroup | |
Functions | MapFunction |
FlatMapFunction | |
FilterFunction | |
KeySelector | |
ReduceFunction | |
CoMapFunction | |
CoFlatMapFunction | |
CoProcessFunction | |
KeyedCoProcessFunction | |
TwoInputStreamOperator | |
Iterations | iterate |
Types | Row Type (represented by tuple in Python, RowTypeInfo in Java) |
Primitive Array Types | |
Basic Types |
Implementation Plan
- Add DataStream class, and task configuration interface.
- Support creating a DataStream through python objects like env.from_collection().
- Support map()/flat_map()/print(), etc.
- Support built-in connectors (Kafka, ElasticSearch, Kinese, etc.)
- Support key_by(), etc..