You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current stateUnder Discussion

Discussion threadSupport python language in flink TableAPI 

JIRA Unable to render Jira issues macro, execution error.

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

At the Flink API level, we have DataStreamAPI/DataSetAPI/TableAPI&SQL, the Table API will become the first-class citizen. Table API is declarative and can be automatically optimized, which is mentioned in the Flink mid-term roadmap by Stephan. So, first considering supporting Python at the Table level to cater to the current large number of analytics users. So this proposal will cover the following items:

  • Python TableAPI Interface

         Introduces a set of Python Table API which should mirror Java / Scala Table API, i.e. the interfaces should including such as Table, TableEnvironment, TableConfig, etc.

  • Python TableAPI Implementation Architecture

         We will offer two alternative architecture options, one for pure Python language support and one for extended multi-language design(long-term goals).

  • Job Submission

         Python Table API programs should be similarly submitted and deployed as Java / Scala Table API programs, Such as CLI, web, containerized, etc.

  • User-defined functions

         Support user-defined stateful and stateless python functions. It includes user-defined scalar function, user-defined table function, and user-defined aggregate function.

  • Pandas Support
    • Add toPandas and fromPandas interfaces in Table API as conversions between Table and pandas.
    • Support to use pandas UDFs directly in Python Table API.

Modules

  • flink-python
    • flink-python-tableThe place for all python interface definitions and implementations, such as Table, Window, TableEnvironment, TableConfig, ConnectorDescriptor, DataType, TableSchema, TableSource, TableSink etc. i.e all the user interface in `flink-table-common` and `flink-table-api-java` should be there.
    • flink-python-streaming(in the future)

     We need to add components in FLINK JIRA as follows:

    • API/Python - for Python API (already exists)
    • Runtime/Python - for Python function execution.
    • Table SQL/Python - for Python user-defined function execution
    • Python Shell - for interactive Python program
  • flink-python-shell

          For interactive development, similar to scala-shell. flink-python-shell users can write and run Python Table API (and Python Datastream API in the future).

  • flink-clients

          Support for submitting Python Table API job in CliFrontend, such as `flink run -py wordcount.py`.

Architecture

We don't develop python operators like `flink-python` and `flink-stream-python`. To get the most out of the existing Java/Scala results (the Calcite-based optimizer), the Python Table API only needs to define the Python Table API interface.  Calls to the existing Java Table API implementation to meet the needs of python users with minimal effort. So our main job is to implement communication between Python VM and Java VM, as shown below:

Currently, we have two options for the implementation of RPC.

Approach 1

For Python API, a very mature solution is to choose Py4J as shown below:

At Python Side, Py4J provides a JavaGateway object. It has a field “jvm” which enables Python program to access the Java classes directly. We can construct the Python wrappers for the Java classes through it.

At the Java side, Py4J provides GatewayServer. It receives the Python API requests and we can use it to delegate all the method calling of the Python API to the corresponding Java/Scala Table API.

There will be Python wrappers for all the API classes such as TableEnvironment, Table, TableSink, TableSource, Catalog, etc.  For example:

class Table(object):

   """

   Wrapper of org.apache.flink.table.api.Table

   """

   def __init__(self, j_table):

       self._j_table = j_table  # The references to table object on JVM.

   def select(self, col_list):

       return Table(self._j_table.select(col_list))  # Call the java table api method directly

...

In this approach Py4J encapsulates all serialization deserialization processes.

Approach 2

Currently, we want to add Python Table API, And in the future, we may want to support the other popular Non-JVM language based Table API, such as R, Go, etc. So we can also have a more scalable approach, as follows:

In this approach, we can replace Py4J with a similar component(suppose named XX4J) which can support all kinds of languages besides Python as the bridge between the client language Table API and Java Table API. From the architecture graph above, we can see that a language-neutral transportation layer (such as protobuf) makes it easy to support new kinds of languages as the Table API.


But by now, Python and Java communication mechanism undoubtedly choose Py4J is the most mature and lowest cost solution. I also do a POC using Py4J. So for the API level, we make the following plan:


  • The short-term:


      We may initially go with a simple approach to map the Python Table API to the Java Table API via Py4J.


  • The long-term:


      We may need to create a Python API that follows the same structure as Flink's Table API that produces the language-independent DAG. (As Stephan already motioned on the mailing thread)

UDF Architecture (Overall)

The main design to support UDF is the communication between JVM and Python VM, and the management of the Python environment. The overall architecture is as follows:

The overall architecture is that UDF runs in an isolated environment which may be a separate process, a docker container, etc and Flink operator communicates with UDF via gRPC. Besides data transfer, the gRPC can also be used to transfer the state access operations, logging operations, metrics reporting operations, etc.

The components involve in the communication between Flink operator and UDF is as follows:

  • Env Service - Responsible for launching and destroying the Python worker execution environment.
    • the Python UDF may have custom dependencies which are not installed in the cluster and the Python version may also be different from the Python installed in the cluster. Two solutions will be provided to solve this problem.
      • Docker - The Python worker runs in docker container. Users should specify a docker image which is a self-contained environment.
      • Process -  Python worker runs as a separate process. Users can create a virtual environment such as virtualenv or conda locally and install third-party dependencies into this environment.
  • Data Service - Responsible for transferring the input data and the UDF execution results between Flink operator and Python worker.
  • State Service - It allows the aggregate function to access state using DataView. It’s responsible for read/write state from/to Flink operator.
  • Logging Service - It provides logging support for UDF. It’s responsible for transferring log entries produced by UDF to Flink operator and integrates with the Flink’s logging system.
  • Metrics Service - It allows UDF to access the Flink’s metrics system.

Stateless User-defined Function (Scalar/Table Function)

The most important thing for the stateless function is the data service. Considering the performance loss of the communication between JVM and Python VM, we need to consider how to optimize the execution performance of stateless function from the points of data processing:

  • Data processing mode
    • Sync - process the elements one by one, ONLY process the next element when we get the execution result of the previous element.
    • Async - process the elements in a pipeline manner. For one input element, the processing stages include transferring data to Python worker, UDF execution and transferring the execution results back to Flink operator. We can use buffers between each stage to decouple the dependencies between these stages.
  • Data transmission mode
    • Single - transfer elements between Flink operator and Python worker one by one.
    • Block - transfer elements between Flink operator and Python worker block by block. Each block can contain multiple elements.

In order to achieve better performance, we choose to use asynchronous data processing mode and transfer data between JVM and Python VM by blocks. The block size can be configured. The workflow is as follows:

Flink operator will firstly send the inputs to Python worker and Python worker executes UDF and sends back the results to Flink operator. Then Flink operator can construct the result rows and send them to the downstream operator. The components involved in the workflow is as follows:

  • Input Buffer - The input elements are put into the input buffer (the columns will be projected and only required columns are transferred to Python worker), i.e. inputs are processed asynchronously to make sure that the inputs can be processed in a pipeline manner.
  • Waiting Buffer - The original inputs sent to Python worker will be buffered as they will be needed when calculating the result row.
  • Result Buffer - The result data are put into the Result Buffer in order and one result data corresponds to one input data in the Waiting Buffer.

Besides, the sibling Python UDFs or parent-child Python UDFs of the same operator will be optimized to run as one Python UDF to eliminate the data transmission/serialization overhead. The input columns will be combined and sent to the Python worker together and each UDF will be executed with its required columns.

Pandas Function Support

Pandas functions require pandas. Series or pandas.DataFrame as inputs. To use pandas functions in Table API, we need to convert data between table and pandas. Apache Arrow provides Java and Python facilities to convert between Apache Arrow table and pandas format. If the Python UDFs are pandas functions, Flink operator will serialize the data using Apache Arrow serializer before sending them to the Python worker. At the other end, Python worker will deserialize the data using Apache Arrow Python facilities before executing the pandas's functions. The workflow is as follows:

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • DataStream and DataSet API, including classes related to that, such as StreamExecutionEnvironment
  • Classes marked with the @Public annotation
  • On-disk binary formats, such as checkpoints/savepoints
  • User-facing scripts/command-line tools, i.e. bin/link, Yarn scripts, Mesos scripts
  • Configuration settings
  • Exposed monitoring information


Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels