Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Env Service - Responsible for launching and destroying the Python worker execution environment.
    • the Python UDF may have custom dependencies which are not installed in the cluster and the Python version may also be different from the Python installed in the cluster. Two solutions will be provided to solve this problem.
      • Docker - The Python worker runs in docker container. Users should specify a docker image which is a self-contained environment.
      • Process -  Python worker runs as a separate process. Users can create a virtual environment such as virtualenv or conda locally and install third-party dependencies into this environment.
    • Data Service - Responsible for transferring the input data and the UDF execution results between Flink operator and Python worker.
      • the Python UDF may have custom dependencies which are not installed in the cluster and the Python version may also be different from the Python installed in the cluster. Two solutions will be provided to solve this problem.
    • Data Service - Responsible for transferring the input data and the UDF execution results between Flink operator and Python worker.
    • State Service - It allows the aggregate function to access state using DataView. It’s responsible for read/write state from/to Flink operator.
    • Logging Service - It provides logging support for UDF. It’s responsible for transferring log entries produced by UDF to Flink operator and integrates with the Flink’s logging system.
    • Metrics Service - It allows UDF to access the Flink’s metrics system.

    ...

    Pandas functions require pandas. Series or pandas.DataFrame as inputs. To use pandas functions in Table API, we need to convert data between table and pandas. Apache Arrow provides Java and Python facilities to convert between Apache Arrow table and pandas format. If the Python UDFs are pandas functions, Flink operator will serialize the data using Apache Arrow serializer before sending them to the Python worker. At the other end, Python worker will deserialize the data using Apache Arrow Python facilities before executing the pandas's functions. The workflow is as follows:

    Public Interfaces

    Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

    A public interface is any change to the following:

    • DataStream and DataSet API, including classes related to that, such as StreamExecutionEnvironment
    • Classes marked with the @Public annotation
    • On-disk binary formats, such as checkpoints/savepoints
    • User-facing scripts/command-line tools, i.e. bin/link, Yarn scripts, Mesos scripts
    • Configuration settings
    • Exposed monitoring information

    Proposed Changes

    Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

    Compatibility, Deprecation, and Migration Plan

    • What impact (if any) will there be on existing users? 
    • If we are changing behavior how will we phase out the older behavior? 
    • If we need special migration tools, describe them here.
    • When will we remove the existing behavior?

    Test Plan

    Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

    Rejected Alternatives

    Image Added

    We can see that the data will be serialized using Apache Arrow between Flink operator and Python worker.

    • Input Buffer - The input elements are put into the input buffer (the columns will be projected and only required columns are transferred to Python worker), i.e. inputs are processed asynchronously to make sure that the inputs can be processed in an async mode.
    • Serialize with arrow - The inputs will be handled (serialize with arrow) in Block mode to obtain better performance instead of Single mode.
    • Data Block - For pandas scalar functions, the size of the block divides are non-deterministic (configured by time or count). For pandas aggregate functions, the inputs belonging to the same group should be continuous.
    • Deserialize with arrow - After the Pandas data processing, we need to deserialize the pandas. Series or pandas.DataFrame data into a data structure that Flink can recognize, such as Row.
    • Waiting Buffer - The original inputs sent to Python worker will be buffered as they will be needed when calculating the result row.
    • Result Buffer - The result data of Result Buffer must be guaranteed in order and one result data corresponds to one input data in the Waiting Buffer.

    NOTE - As the pandas aggregate functions don’t support retraction and states and so they will only be used in the batch scenario.

    Stateful User-defined Functions (Aggregate Function)

    The most complicated User-Defined function is the User-Defined Aggregate function. This involves the use of Flink state, i.e. Here we are going to introduce the state service. The workflow is as follows:

    Image Added

    Compared to the workflow of the user-defined function, the following should be noted:

    1. The inputs will be grouped before sending to Python worker. It has the following advantages:
      1. Get the next input, check if this input belongs to a new key, if true, send the aggregation result out and go to the next step, otherwise, go to the next step.
      2. Perform aggregate/retract/merge operation.
      1. This is to make sure that the inputs are ordered by key and when the aggregation results come back, we can easily know which keys the aggregation results correspond to.
      2. This processing logic of Python worker can be very simple:  
    2. UDAF can access the state via DataView. The state operations will be forwarded to Flink operator and mapped to state backend operations. Python worker will cache frequently accessed states in memory to reduce the remote state access operations. The cached states in the Python worker will be sent back to Flink operator when the checkpoint is triggered to ensure the checkpoint works normally. The cached states will also be sent back to Flink operator when cache evicting occurs.

    The components involved in the workflow is as follows:

    1. Input Buffer - The input elements are put into the input buffer (the columns will be projected and only required columns are transferred to Python worker), i.e. inputs are processed asynchronously to make sure that the inputs can be processed in an async mode.
    2. Group the inputs - The inputs will be grouped according to the group key and the inputs belonging to the same group key will be continuous.
    3. State Service - Aggregate Function can use DataView to access states. The states accessed can be buffered in the memory of the Python worker and LRU strategy will be used as the eviction strategy. Buffered states will be sent back to the operator when the checkpoint is triggered or state is evicted from the buffer.
    4. Waiting Buffer - The original inputs sent to Python worker will be buffered as they will be needed when calculating the result row.
    5. Result Buffer - The result data of Result Buffer must be guaranteed in order and one result data corresponds to one input data in the Waiting Buffer.

    Public Interfaces

    The Python Table API classes are just some wrappers of the corresponding Java Table API classes. All the user interface in `flink-table-common` and `flink-table-api-java` should be defined in Python Table API. The main interface/implementation is as follows:

    • Table
    • TableEnvironment
    • TableConfig
    • Descriptor
    • TypeInformation(May change after FLIP-37)
    • etc

    Table

    class Table(object):

       """

       Wrapper of org.apache.flink.table.api.Table

       """

       def __init__(self, j_table):

           self._j_table = j_table  # The references to table object on JVM.

       def select(self, col_list):

           return Table(self._j_table.select(col_list))  # Call the java table api method directly

       def where(self, condition):

           return Table(self._j_table.where(condition))

       …

       …

       def write_to_sink(self, j_sink):

           self._j_table.writeToSink(j_sink._j_table_sink)

       def to_pandas(self):

           …

       return pandas.DataFrame(..)

        …

       ....

    GroupedTable

    class GroupedTable(object):

      """

      Wrapper of org.apache.flink.table.api.GroupedTable

      """

      def __init__(self, j_grouped_table):

          # The references to group table object on JVM.

          self._j_grouped_table = j_grouped_table

      def select(self, col_list):  

          # Call the java table api method directly

          return Table(self._j_grouped_table.select(col_list))

    GroupWindowedTable, WindowGroupedTable, OverWindowedTable are defined similarly as GroupTable.

    TableConfig

    class TableConfig(object):

       def __init__(self, j_table_config):

               self._j_table_config = j_table_config

       class Builder(object):

           def __init__(self, j_table_config_builder):

               self._j_table_config_builder = j_table_config_builder

           def set_time_zone(self, time_zone):

               self._j_table_config_builder.setTimeZone(time_zone)

               return self

           def set_null_check(self, null_check):

               self._j_table_config_builder.setNullCheck(null_check)

               return self

           …

           …

           # We don't want to expose StreamExecutionEnvironment

           def as_streaming_execution(self):

               self._j_table_config_builder.asStreamingExecution()

               return self

           # We don't want to expose ExecutionEnvironment

           def as_batch_execution(self):

               self._j_table_config_builder.asBatchExecution()

               return self

           def build(self):

               return TableConfig(self._j_table_config_builder.build())

       @classmethod

       def builder(cls):  

           return cls.Builder(_jvm.org.apache.flink.table.api.TableConfig.builder())

    TableEnvironment

    class TableEnvironment(object):

      """

      Wrap and extend for org.apache.flink.table.api.TableEnvironment

      """

      table_config = None

      def __init__(self, j_tenv):

          self._j_tenv = j_tenv

      def register_table(self, name, table):

          self._j_tenv.registerTable(name, table._j_table)

      def register_table_source(self, name, table_source):

          self._j_tenv.registerTableSource(name, table_source.j_table_source)

      def register_table_sink(self, name, table_sink):

          self._j_tenv.registerTableSink(name, table_sink.j_table_sink)

      def scan(self, *table_path):

          j_paths = TypesUtil.convert_py_list_to_j_array("java.lang.String", table_path)

          j_table = self._j_tenv.scan(j_paths)

          return Table(j_table)

      def connect(self, connector_descriptor):

          return TableDescriptor(self._j_tenv.connect(connector_descriptor._j_connector_descriptor))

       …

       …

      def sql_query(self, query):

          j_table = self._j_tenv.sqlQuery(query)

          return Table(j_table)

      def sql_update(self, stmt, config=None):

          if config is not None:

              j_table = self._j_tenv.sqlUpdate(stmt, config)

          else:

              j_table = self._j_tenv.sqlUpdate(stmt)

      # Extension methods

      def from_collection(self, data):

          …

          return Table(...)

      def from_pandas(self, data, schema=None):

          …

         return Table(...)

      def execute(self):

          self._j_tenv.execEnv().execute()

      def set_parallelism(self, parallelism):

          self._j_tenv.execEnv().setParallelism(parallelism)

      …

      ...

      @classmethod

      def create(cls, table_config):

          j_tenv = ...

          return TableEnvironment(j_tenv)

    Descriptor

    There are a lot of Descriptor related classes, we will take Csv as an example:

    class Csv(FormatDescriptor):

      def __init__(self):

          self._j_csv = _jvm.org.apache.flink.table.descriptors.Csv()

          super(Csv, self).__init__(self._j_csv)

      def field_delimiter(self, delimiter):

          self._j_csv.fieldDelimiter(delimiter)

          return self

      def line_delimiter(self, delimiter):

          self._j_csv.lineDelimiter(delimiter)

          return self

      def quote_character(self, quoteCharacter):

          self._j_csv.quoteCharacter(quoteCharacter)

          return self

      def allow_comments(self):

          self._j_csv.allowComments()

          return self

      def ignore_parse_errors(self):

          self._j_csv.ignoreParseErrors()

          return self

      def array_element_delimiter(self, delimiter):

          self._j_csv.arrayElementDelimiter(delimiter)

          return self

      def escape_character(self, escape_character):

          self._j_csv.escapeCharacter(escape_character)

          return self

      def null_literal(self, null_literal):

          self._j_csv.nullLiteral(null_literal)

          return self

    UDFs Interface

    UserDefinedFunction

    class UserDefinedFunction(object):

      """

      An interface for writing python user defined function

      """

      __metaclass__ = ABCMeta

      def open(self, *args):

          pass

      def close(self, *args):

          pass

      def is_deterministic(self):

          return True

    ScalarFunction

    class ScalarFunction(UserDefinedFunction):

      @abstractmethod

      def eval(self, *args):

          """Please define your implementation"""

    TableFunction

    class TableFunction(UserDefinedFunction):

      @abstractmethod

      def eval(self, *args):

          """Please define your implementation"""

      def collect(self, element):

          pass

    AggregationFunction

    class AggregateFunction(UserDefinedFunction):

      """

      optional methods:

      def accumulate(self, accumulator, *args):

          pass

      def retract(self, accumulator, *args):

          pass

      def merge(self, accumulator, other_accumulators):

          pass

      """

      @abstractmethod

      def create_accumulator(self):

          """Please define your implementation"""

      @abstractmethod

      def get_value(self, accumulator):

          """Please define your implementation"""

    Decorators

    • udf: used for ScalarFunction
    • udtf: used for TableFunction
    • udaf: used for AggregateFunction

    decorator arguments:

    • input: used in ScalarFunction, TableFunction and AggregateFunction, indicates the input type of the user-defined function.
    • output: used in ScalarFunction, TableFunction and AggregateFunction, indicates the result type of the user-defined function.
    • accumulator: used in AggregateFunction, indicates the accumulator type of the aggregation function.

    NOTE: The decorators may change according to FLIP-37

    DataView

    class DataView(object):

      """

      An interface for defining DataView

      """

      __metaclass__ = ABCMeta

      @abstractmethod

      def clear(self):

          pass

    MapView

    class MapView(DataView):

      def __init__(self, key_type, value_type):

          self.key_type = key_type

          self.value_type = value_type

      def get(self, key):

          return _NoValue

      def put(self, key, value):

          pass

      def put_all(self, map):

          pass

      def remove(self, key):

          pass

      def contains(self, key):

          return _NoValue

      def entries(self):

          return _NoValue

      def keys(self):

          return _NoValue

      def values(self):

          return _NoValue

      def iterator(self):

          return _NoValue

    ListView

    class ListView(DataView):

      def __init__(self, element_type):

          self.element_type = element_type

      def get(self):

          return _NoValue

      def add(self, value):

          pass

      def add_all(self, values):

          pass

    Expression

    Expression API will be not supported in the initial version as there is an ongoing work of introducing Table API Java Expression DSL. Python Expression API will leverage that and supported once that work is done.

    DIST

    Create a python directory in the published opt directory. The contents of python are as follows:

    opt/python

    ├── README.MD

    └── lib

       ├── py4j-x.y.z-src.zip

       ├── py4j-LICENSE.txt

       └── pyflink.zip

    └── ...

    The flink-python-table module will be packaged as pyflink.zip, And put it in to opt/python/lib directory with PY4J_LICENSE.txt py4j-xxx-src.zip.

    • And the pyflink shell we be added in published bin directory.
    • The shell of `flink` should add some options for Python Table API, such as:
      • -py --python
      • -pyfs --py-files
      • etc ...

        The detail can be found in the Job Submission section.

    Docs

    • Add the description of `flink run -py xx.py` in CLI
    • Add rest service API for submit job
    • Add a Python REPL submenu under the Deployment & Operations directory to add documentation for the python shell.
    • Add Python Table API doc in current TableAPI doc
    • Add Common concepts doc for Python Table API, in Basic concepts doc
    • Add pythondocs at the same level as javadocs and scaladocs
    • Add Python UDFs documents
    • etc.

    Examples

    User-Defined Scalar Function

    • Scalar Function definition

    @udf(input=DataTypes.STRING, output=DataTypes.STRING)

    class TrimFunction(ScalarFunction):

      def eval(self, param):

          def trim(s):

              if len(s) == 0:

                  return ''

              if s[:1] == ' ':

                  return trim(s[1:])

              elif s[-1:] == ' ':

                  return trim(s[:-1])

              else:

                  return s

          return trim(param)


    • Usage

    my_udf = TrimFunction()

    t_env.register_function('trim', my_udf)

    source = ...

    result = source.select("trim(a) as a")

    User-Defined Table Function

    • Table Function definition

    @udtf(input=DataTypes.STRING, output=DataTypes.STRING)

    class SplitTableFunction(TableFunction):

      def eval(self, names):

          for name in names.split(' '):

              self.collect(name)


    • Usage

    my_udtf = SplitTableFunction()

    t_env.register_function('split', my_udtf)

    source = ...

    source.join_lateral('split(a) as (word)').select("word").group_by("word").select("word, word.count  as word_count")

    User-Defined Aggregate Function

    • Aggregate Function definition

    @udaf(input=DataTypes.ANY, output=DataTypes.INT, accumulator=DataTypes.ROW(DataTypes.INT))

    class CountAggregateFunction(AggregateFunction):

      def accumulate(self, acc, param):

          if param is not None:

              acc.set_value(acc.get_value(0) + 1)

      def merge(self, acc, other_acc):

          for item in other_acc:

             acc.set_value(0, acc.get_value(0) + item.get_value(0))

          return acc

      def retract(self, acc, param):

          if param is not None:

              acc.set_value(acc.get_value(0) - 1)

      def create_accumulator(self):

          return Row([0])

      def get_value(self, acc):

          return acc.get_value(0)

    WordCount

    Let's take a word count as an example to provide a complete example. The Python Table API will look like the following (wordCount.py):

    from ... import TrimFunction, CountAggregateFunction, SplitTableFunction

    tmp_dir = tempfile.gettempdir()

    source_path = tmp_dir + '/streaming.csv'

    if os.path.isfile(source_path):

      os.remove(source_path)

    with open(source_path, 'w') as f:

      lines = 'hi guys\n' + 'hello guys\n'

      f.write(lines)

      print (lines)

      f.close()

    t_config = TableConfig.Builder().as_streaming_execution().set_parallelism(1).build()

    t_env = TableEnvironment.get_table_environment(t_config)

    field_names = ["a"]

    field_types = [DataTypes.STRING]

    # register Source table in table environment

    t_env.register_table_source(

      "Source",

      CsvTableSource(source_path, field_names, field_types))

    # register Results table in table environment

    # using connector API

    schema = Schema() \

      .field("a", DataTypes.STRING) \

      .field("b", DataTypes.INT)

    format = Csv() \

      .field_delimiter(",") \

      .field("a", DataTypes.STRING) \

      .field("b", DataTypes.INT)

    tmp_dir = tempfile.gettempdir()

    tmp_csv = tmp_dir + '/streaming2.csv'

    if os.path.isfile(tmp_csv):

      os.remove(tmp_csv)

    connector = FileSystem() \

      .path(tmp_csv)

    t_env.connect(connector) \

      .with_format(format) \

      .with_schema(schema) \

      .in_append_mode() \

      .register_table_sink("Results")

    t = t_env.scan("Source")

    my_udf = TrimFunction()

    t_env.register_function('trim', my_udf)

    my_udtf = SplitTableFunction()

    t_env.register_function('split', my_udtf)

    my_udaf = CountAggregateFunction()

    t_env.register_function('my_count', my_udaf)

    t.join_lateral("split(a) as (word)").select("trim(word) as word")\

      .group_by("word").select("word, my_count(word)")\

      .insert_into("Results")

    t_env.execute()

    with open(tmp_csv, 'r') as f:

      lines = f.read()

      print (lines)

    Job Submission

    Flink Run

    Support for submitting Python Table API job in CliFrontend,And using `flink run` submit Python Table API job. The current `flink` command command line syntax is as follows:

    flink <ACTION> [OPTIONS] [ARGUMENTS]

    On the basis of the current `run` ACTION, we add to Python Table API support, specific OPTIONS are as follows:

    • -py --python  <python-file-name>

    Python script with the program entry point. We can configure dependent resources with the `--py-files` option.

    • -pyfs --py-files <python-files>   

    Attach custom python files for job. Comma can be used as the separator to specify multiple files. The standard python resource file suffixes such as .py/.egg/.zip all also supported.

      • -pym --py-module <python-module>  Python module with the program entry point. This option must be used in conjunction with ` --py-files`.
    • -py-exec --py-exec <python-binary-path>

    The Python binary to be used.

    • -py-env --py-env <process/docker>

    The execution environment: process or docker.

    • -py-docker-image --py-docker-image <docker image>

    The docker image to be used to run the python worker. Valid when the option of “-py-env” is docker.

    NOTE: When options `py` and `pym` appear at the same time, option `py` will be valid.

    We can submit the Python Table API jobs as follows:

    // Submit a simple job without any dependencies and parameters

    FLINK_HOME/bin/flink run -py example.py

    // Submit a job with dependencies and parallelism 16

    FLINK_HOME/bin/flink run -p 16 -py example.py -pyfs resources1.egg, resources2.zip, resource3.py

    // Submit a job by configuring python class

    FLINK_HOME/bin/flink run -pyfs resources1.egg, resources2.zip, resource3.py -pym org.apache.pyflink.example

    // Submit a job with binary path and execution environment config

    FLINK_HOME/bin/flink run -pyfs resources1.egg, resources2.zip, resource3.py -py-exec ./my_env/env/bin/python -py-env process -pym org.apache.pyflink.example

    // Sumit a simple job in which the Python worker runs in a docker container

    FLINK_HOME/bin/flink run --py-env docker --py-docker-image xxx -py example.py

    Regarding the [ARGUMENTS] section of syntax, we reuse existing features without any extensions.

    Python Shell

    The main goal of Flink Python Shell is to provide an interactive way for users to write and execute flink Python Table API jobs.

    By default, jobs are executed in a mini cluster (for beginner users to learn and research). If users want to submit jobs to a cluster, they need to configure additional parameters, such as the execution mode or the JobMaster address (standalone cluster), just like Flink Scala Shell.

    The Python shell is responsible for confirming the location of the flink files, loading required dependencies and then executing the shell initialization logic.

    The initialization procedure will import all the Flink Python Table API classes which should be exposed to the user.

    Example:

    ...

    >>>t_config = TableConfig.builder().as_streaming_execution().set_parallelism(1).build()

    >>>t_env = TableEnvironment.create(t_config)

    >>>data = [(1L, "Sunny"), (2L, "Eden")]

    >>>source = t_env.from_collection(data, "id, name")

    >>>source.select("id, name").insertInto(“print”)

    >>>t_env.execute()

    1,Sunny

    2,Eden

    REST API

    Currently we have a complete set of REST APIs that can be used to submit Java jobs(https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/rest_api.html). We need to make some changes to the current REST API to support submitting Python Table API jobs:

    Current REST API

    New REST API

    Remarks

    GET /jars

    GET /artifacts

    Now it lists both jar files and python package files.

    POST /jars/upload

    POST /artifacts/upload

    Allow users to upload both jar files and python package files. Use "Content-Type" in the request header to mark the file type.

    DELETE /jars/:jarid

    DELETE /artifacts/:artifactid

    Delete files according to “artifactid”.

    GET /jars/:jarid/plan

    GET /artifacts/:artifactid/plan

    We will introduce a new optional parameter:

    pythonEntryModule.

    It allows specifying a python entry point module other than __main__.py.

    POST /jars/:jarid/run

    POST /artifacts/:artifactid/run

    We also introduce the optional pythonEntryModule parameter here, as same as above.

    We can firstly deprecated the old REST APIs and remove them in one or two releases.

    Proposed Changes

    This FLIP proposes are Flink Table's new support for the Python language. We will add a Python-related module, Introduces a set of Python Table API which should mirror Java / Scala Table API, and Job Submission, etc. And we will add checks for all of the changes.

    Compatibility, Deprecation, and Migration Plan

    This FLIP proposes new functionality and operators for the Python Table API. The behavior of existing operators is not modified.

    Test Plan

    This FLIP proposes can check by both It test case and validate the test case.

    Implementation plan

    1. Add the flink-python module and a submodule flink-python-table to Py4j dependency configuration and Scan, Projection, and Filter operator of the Python Table API, and can be run in IDE(with simple test).
    2. Add a basic test framework, just like the existing Java TableAPI, abstract some TestBase.
    3. Add simplicity support for submitting Python Table API job in CliFrontend, i.e. `flink run -py wordcount.py` can be work(with simple test).
    4. Add all the TypeInformation support in Python Table API.
    5. Add all of the existing interfaces in Java Table API such as Table, TableEnvironment, TableConfig, and Descriptor etc.
    6. Add all the Source/Sink Descriptor support in Python Table API.
    7. Add Pip support, Flink Python Table API can be installed in python repo.
    8. Add Python Table API doc in current TableAPI doc
    9. Add Common concepts doc for Python Table API, in Basic concepts doc
    10. Add pythondocs at the same level as javadocs and scaladocs
    11. Add convenience interfaces to Python TableEnvironment, such as `from_collection`, `set_parallelism`, etc. which from `StreamExecutionEnvironment`.
    12. Add the description of `flink run -py xx.py` in CLI
    13. Add a Python shell, expose the TableEnvironment to the user, and automatically import the common dependencies.
    14. Add a Python REPL submenu under the Deployment & Operations directory to add documentation for the python shell.
    15. Add Support single job contains multiple python scripts, i.e. add `-pyfs --py-files <py-files>` (Attach custom python files for job. Use ',' to separate multiple files) for flink run `.  
    16. Other JIRAs that are constantly improving and enhanced, such as Catalog, DDL, Expression DSL, etc.
    1. ScalarFunction support
      1. Python ScalarFunction difinition
      2. Python ScalarFunction translated to Java ScalarFunction
      3. data service support
      4. optimizations for multiple scalar functions in the same operator
    2. TableFunction support
      1. Python TableFunction difinition
    3. AggregateFunction support
      1. Python AggregateFunction definition
      2. Python AggregateFunction translated to Java AggregateFunction
      3. DataView support
      4. state service support
      5. optimizations for multiple AggregateFunctions in the same operator
    4. Pandas API (fromPandas/toPandas) support
    5. Pandas scalar function support
    6. Pandas aggregate function support
    7. Pluggable execution environment support
    8. Logging service support
    9. Metrics service support
    10. etc...

    Future or next step

    Batch Execution Optimization, such user-defined scalar function:

    Suppose we have a job as following:

    # udf_1 and udf_2 are Python UDF and udf_3 is Java UDF

    t_env.register_function(“udf_1”, PythonScalarFunction1())

    t_env.register_function(“udf_2”, PythonScalarFunction2())

    t_env.register_function(“udf_3”, JavaScalarFunction3())

    t = t_env.scan("Orders")  # schema (a, b, c, d)

    t.select(“a, udf_1(a, b), udf_2(b, c), udf_3(c, d)”)


    The two Python UDFs “udf_1” and “udf_2” will be optimized to run in the same Python worker.

    The physical plan will look like follows:

    DataStreamCalc(select=[a, udf_1(a, b) AS EXPR$2, udf_2(b, c) AS EXPR$3, EXPR$1])

     +- DataStreamCalc(select=[a, b, c, udf_3(c, d) AS EXPR$1])

       +- DataStreamScan(table=[[ _DataStreamTable_0]])


    This optimization applies for both common Python scalar function and pandas function.

    Batch Execution Optimization is not only needed by scalar function but also can be applied on table function and aggregate function.

    Rejected Alternatives

    Regarding the API, we can adopt a scheme similar to the Beam community, that is, we may need to create a Python API that follows the same structure as Flink's Table API that produces the language-independent DAG. This approach needs to do a lot of cooperation with the beam community and it as a solution for the long-term goalsIf there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.