Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current stateUnder Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

At the Flink API level, we have DataStreamAPI/DataSetAPI/TableAPI&SQL, the Table API will become the first-class citizen. Table API is declarative and can be automatically optimized, which is mentioned in the Flink mid-term roadmap by Stephan. So, first considering supporting Python at the Table level to cater to the current large number of analytics users. So this proposal will cover the following items:

...

  • Pandas Support
    • Add toPandas and fromPandas interfaces in Table API as conversions between Table and pandas.
    • Support to use pandas UDFs directly in Python Table API.

Modules

  • flink-python
    • flink-python-tableThe place for all python interface definitions and implementations, such as Table, Window, TableEnvironment, TableConfig, ConnectorDescriptor, DataType, TableSchema, TableSource, TableSink etc. i.e all the user interface in `flink-table-common` and `flink-table-api-java` should be there.
    • flink-python-streaming(in the future)

...

          Support for submitting Python Table API job in CliFrontend, such as `flink run -py wordcount.py`.

Architecture

We don't develop python operators like `flink-python` and `flink-stream-python`. To get the most out of the existing Java/Scala results (the Calcite-based optimizer), the Python Table API only needs to define the Python Table API interface.  Calls to the existing Java Table API implementation to meet the needs of python users with minimal effort. So our main job is to implement communication between Python VM and Java VM, as shown below:

...

Currently, we have two options for the implementation of RPC.

Approach 1

For Python API, a very mature solution is to choose Py4J as shown below:

...

In this approach Py4J encapsulates all serialization deserialization processes.

Approach 2

Currently, we want to add Python Table API, And in the future, we may want to support the other popular Non-JVM language based Table API, such as R, Go, etc. So we can also have a more scalable approach, as follows:

...

      We may need to create a Python API that follows the same structure as Flink's Table API that produces the language-independent DAG. (As Stephan already motioned on the mailing thread)

UDF Architecture (Overall)

The main design to support UDF is the communication between JVM and Python VM, and the management of the Python environment. The overall architecture is as follows:

...

  • Env Service - Responsible for launching and destroying the Python worker execution environment.
    • Docker - The Python worker runs in docker container. Users should specify a docker image which is a self-contained environment.
    • Process -  Python worker runs as a separate process. Users can create a virtual environment such as virtualenv or conda locally and install third-party dependencies into this environment.
    • the Python UDF may have custom dependencies which are not installed in the cluster and the Python version may also be different from the Python installed in the cluster. Two solutions will be provided to solve this problem.
  • Data Service - Responsible for transferring the input data and the UDF execution results between Flink operator and Python worker.
  • State Service - It allows the aggregate function to access state using DataView. It’s responsible for read/write state from/to Flink operator.
  • Logging Service - It provides logging support for UDF. It’s responsible for transferring log entries produced by UDF to Flink operator and integrates with the Flink’s logging system.
  • Metrics Service - It allows UDF to access the Flink’s metrics system.

Stateless User-defined Function (Scalar/Table Function)

The most important thing for the stateless function is the data service. Considering the performance loss of the communication between JVM and Python VM, we need to consider how to optimize the execution performance of stateless function from the points of data processing:

...

Besides, the sibling Python UDFs or parent-child Python UDFs of the same operator will be optimized to run as one Python UDF to eliminate the data transmission/serialization overhead. The input columns will be combined and sent to the Python worker together and each UDF will be executed with its required columns.

Pandas Function Support

Pandas functions require pandas. Series or pandas.DataFrame as inputs. To use pandas functions in Table API, we need to convert data between table and pandas. Apache Arrow provides Java and Python facilities to convert between Apache Arrow table and pandas format. If the Python UDFs are pandas functions, Flink operator will serialize the data using Apache Arrow serializer before sending them to the Python worker. At the other end, Python worker will deserialize the data using Apache Arrow Python facilities before executing the pandas's functions. The workflow is as follows:

...

NOTE - As the pandas aggregate functions don’t support retraction and states and so they will only be used in the batch scenario.

Stateful User-defined Functions (Aggregate Function)

The most complicated User-Defined function is the User-Defined Aggregate function. This involves the use of Flink state, i.e. Here we are going to introduce the state service. The workflow is as follows:

...

  1. Input Buffer - The input elements are put into the input buffer (the columns will be projected and only required columns are transferred to Python worker), i.e. inputs are processed asynchronously to make sure that the inputs can be processed in an async mode.
  2. Group the inputs - The inputs will be grouped according to the group key and the inputs belonging to the same group key will be continuous.
  3. State Service - Aggregate Function can use DataView to access states. The states accessed can be buffered in the memory of the Python worker and LRU strategy will be used as the eviction strategy. Buffered states will be sent back to the operator when the checkpoint is triggered or state is evicted from the buffer.
  4. Waiting Buffer - The original inputs sent to Python worker will be buffered as they will be needed when calculating the result row.
  5. Result Buffer - The result data of Result Buffer must be guaranteed in order and one result data corresponds to one input data in the Waiting Buffer.

Public Interfaces

The Python Table API classes are just some wrappers of the corresponding Java Table API classes. All the user interface in `flink-table-common` and `flink-table-api-java` should be defined in Python Table API. The main interface/implementation is as follows:

  • Table
  • TableEnvironment
  • TableConfig
  • Descriptor
  • TypeInformation(May change after FLIP-37)
  • etc

Table

class Table(object):

   """

   Wrapper of org.apache.flink.table.api.Table

   """

   def __init__(self, j_table):

       self._j_table = j_table  # The references to table object on JVM.

   def select(self, col_list):

       return Table(self._j_table.select(col_list))  # Call the java table api method directly

   def where(self, condition):

       return Table(self._j_table.where(condition))

   …

   …

   def write_to_sink(self, j_sink):

       self._j_table.writeToSink(j_sink._j_table_sink)

   def to_pandas(self):

       …

   return pandas.DataFrame(..)

    …

   ....

GroupedTable

class GroupedTable(object):

  """

  Wrapper of org.apache.flink.table.api.GroupedTable

  """

  def __init__(self, j_grouped_table):

      # The references to group table object on JVM.

      self._j_grouped_table = j_grouped_table

  def select(self, col_list):  

      # Call the java table api method directly

      return Table(self._j_grouped_table.select(col_list))

GroupWindowedTable, WindowGroupedTable, OverWindowedTable are defined similarly as GroupTable.

TableConfig

class TableConfig(object):

   def __init__(self, j_table_config):

           self._j_table_config = j_table_config

   class Builder(object):

       def __init__(self, j_table_config_builder):

           self._j_table_config_builder = j_table_config_builder

       def set_time_zone(self, time_zone):

           self._j_table_config_builder.setTimeZone(time_zone)

           return self

       def set_null_check(self, null_check):

           self._j_table_config_builder.setNullCheck(null_check)

           return self

       …

       …

       # We don't want to expose StreamExecutionEnvironment

       def as_streaming_execution(self):

           self._j_table_config_builder.asStreamingExecution()

           return self

       # We don't want to expose ExecutionEnvironment

       def as_batch_execution(self):

           self._j_table_config_builder.asBatchExecution()

           return self

       def build(self):

           return TableConfig(self._j_table_config_builder.build())

   @classmethod

   def builder(cls):  

       return cls.Builder(_jvm.org.apache.flink.table.api.TableConfig.builder())

TableEnvironment

class TableEnvironment(object):

  """

  Wrap and extend for org.apache.flink.table.api.TableEnvironment

  """

  table_config = None

  def __init__(self, j_tenv):

      self._j_tenv = j_tenv

  def register_table(self, name, table):

      self._j_tenv.registerTable(name, table._j_table)

  def register_table_source(self, name, table_source):

      self._j_tenv.registerTableSource(name, table_source.j_table_source)

  def register_table_sink(self, name, table_sink):

      self._j_tenv.registerTableSink(name, table_sink.j_table_sink)

  def scan(self, *table_path):

      j_paths = TypesUtil.convert_py_list_to_j_array("java.lang.String", table_path)

      j_table = self._j_tenv.scan(j_paths)

      return Table(j_table)

  def connect(self, connector_descriptor):

      return TableDescriptor(self._j_tenv.connect(connector_descriptor._j_connector_descriptor))

   …

   …

  def sql_query(self, query):

      j_table = self._j_tenv.sqlQuery(query)

      return Table(j_table)

  def sql_update(self, stmt, config=None):

      if config is not None:

          j_table = self._j_tenv.sqlUpdate(stmt, config)

      else:

          j_table = self._j_tenv.sqlUpdate(stmt)

  # Extension methods

  def from_collection(self, data):

      …

      return Table(...)

  def from_pandas(self, data, schema=None):

      …

     return Table(...)

  def execute(self):

      self._j_tenv.execEnv().execute()

  def set_parallelism(self, parallelism):

      self._j_tenv.execEnv().setParallelism(parallelism)

  …

  ...

  @classmethod

  def create(cls, table_config):

      j_tenv = ...

      return TableEnvironment(j_tenv)

Descriptor

There are a lot of Descriptor related classes, we will take Csv as an example:

class Csv(FormatDescriptor):

  def __init__(self):

      self._j_csv = _jvm.org.apache.flink.table.descriptors.Csv()

      super(Csv, self).__init__(self._j_csv)

  def field_delimiter(self, delimiter):

      self._j_csv.fieldDelimiter(delimiter)

      return self

  def line_delimiter(self, delimiter):

      self._j_csv.lineDelimiter(delimiter)

      return self

  def quote_character(self, quoteCharacter):

      self._j_csv.quoteCharacter(quoteCharacter)

      return self

  def allow_comments(self):

      self._j_csv.allowComments()

      return self

  def ignore_parse_errors(self):

      self._j_csv.ignoreParseErrors()

      return self

  def array_element_delimiter(self, delimiter):

      self._j_csv.arrayElementDelimiter(delimiter)

      return self

  def escape_character(self, escape_character):

      self._j_csv.escapeCharacter(escape_character)

      return self

  def null_literal(self, null_literal):

      self._j_csv.nullLiteral(null_literal)

      return self

UDFs Interface

UserDefinedFunction

class UserDefinedFunction(object):

  """

  An interface for writing python user defined function

  """

  __metaclass__ = ABCMeta

  def open(self, *args):

      pass

  def close(self, *args):

      pass

  def is_deterministic(self):

      return True

ScalarFunction

class ScalarFunction(UserDefinedFunction):

  @abstractmethod

  def eval(self, *args):

      """Please define your implementation"""

TableFunction

class TableFunction(UserDefinedFunction):

  @abstractmethod

  def eval(self, *args):

      """Please define your implementation"""

  def collect(self, element):

      pass

AggregationFunction

class AggregateFunction(UserDefinedFunction):

  """

  optional methods:

  def accumulate(self, accumulator, *args):

      pass

  def retract(self, accumulator, *args):

      pass

  def merge(self, accumulator, other_accumulators):

      pass

  """

  @abstractmethod

  def create_accumulator(self):

      """Please define your implementation"""

  @abstractmethod

  def get_value(self, accumulator):

      """Please define your implementation"""

Decorators

  • udf: used for ScalarFunction
  • udtf: used for TableFunction
  • udaf: used for AggregateFunction

...

NOTE: The decorators may change according to FLIP-37

DataView

class DataView(object):

  """

  An interface for defining DataView

  """

  __metaclass__ = ABCMeta

  @abstractmethod

  def clear(self):

      pass

MapView

class MapView(DataView):

  def __init__(self, key_type, value_type):

      self.key_type = key_type

      self.value_type = value_type

  def get(self, key):

      return _NoValue

  def put(self, key, value):

      pass

  def put_all(self, map):

      pass

  def remove(self, key):

      pass

  def contains(self, key):

      return _NoValue

  def entries(self):

      return _NoValue

  def keys(self):

      return _NoValue

  def values(self):

      return _NoValue

  def iterator(self):

      return _NoValue

ListView

class ListView(DataView):

  def __init__(self, element_type):

      self.element_type = element_type

  def get(self):

      return _NoValue

  def add(self, value):

      pass

  def add_all(self, values):

      pass

Expression

Expression API will be not supported in the initial version as there is an ongoing work of introducing Table API Java Expression DSL. Python Expression API will leverage that and supported once that work is done.

DIST

Create a python directory in the published opt directory. The contents of python are as follows:

...

    The detail can be found in the Job Submission section.

Docs

  • Add the description of `flink run -py xx.py` in CLI
  • Add rest service API for submit job
  • Add a Python REPL submenu under the Deployment & Operations directory to add documentation for the python shell.
  • Add Python Table API doc in current TableAPI doc
  • Add Common concepts doc for Python Table API, in Basic concepts doc
  • Add pythondocs at the same level as javadocs and scaladocs
  • Add Python UDFs documents
  • etc.

Examples

User-Defined Scalar Function

  • Scalar Function definition

...

my_udf = TrimFunction()

t_env.register_function('trim', my_udf)

source = ...

result = source.select("trim(a) as a")

User-Defined Table Function

  • Table Function definition

...

my_udtf = SplitTableFunction()

t_env.register_function('split', my_udtf)

source = ...

source.join_lateral('split(a) as (word)').select("word").group_by("word").select("word, word.count  as word_count")

User-Defined Aggregate Function

  • Aggregate Function definition

@udaf(input=DataTypes.ANY, output=DataTypes.INT, accumulator=DataTypes.ROW(DataTypes.INT))

class CountAggregateFunction(AggregateFunction):

  def accumulate(self, acc, param):

      if param is not None:

          acc.set_value(acc.get_value(0) + 1)

  def merge(self, acc, other_acc):

      for item in other_acc:

         acc.set_value(0, acc.get_value(0) + item.get_value(0))

      return acc

  def retract(self, acc, param):

      if param is not None:

          acc.set_value(acc.get_value(0) - 1)

  def create_accumulator(self):

      return Row([0])

  def get_value(self, acc):

      return acc.get_value(0)

WordCount

Let's take a word count as an example to provide a complete example. The Python Table API will look like the following (wordCount.py):

from ... import TrimFunction, CountAggregateFunction, SplitTableFunction

tmp_dir = tempfile.gettempdir()

source_path = tmp_dir + '/streaming.csv'

if os.path.isfile(source_path):

  os.remove(source_path)

with open(source_path, 'w') as f:

  lines = 'hi guys\n' + 'hello guys\n'

  f.write(lines)

  print (lines)

  f.close()

t_config = TableConfig.Builder().as_streaming_execution().set_parallelism(1).build()

t_env = TableEnvironment.get_table_environment(t_config)

field_names = ["a"]

field_types = [DataTypes.STRING]

# register Source table in table environment

t_env.register_table_source(

  "Source",

  CsvTableSource(source_path, field_names, field_types))

# register Results table in table environment

# using connector API

schema = Schema() \

  .field("a", DataTypes.STRING) \

  .field("b", DataTypes.INT)

format = Csv() \

  .field_delimiter(",") \

  .field("a", DataTypes.STRING) \

  .field("b", DataTypes.INT)

tmp_dir = tempfile.gettempdir()

tmp_csv = tmp_dir + '/streaming2.csv'

if os.path.isfile(tmp_csv):

  os.remove(tmp_csv)

connector = FileSystem() \

  .path(tmp_csv)

t_env.connect(connector) \

  .with_format(format) \

  .with_schema(schema) \

  .in_append_mode() \

  .register_table_sink("Results")

t = t_env.scan("Source")

my_udf = TrimFunction()

t_env.register_function('trim', my_udf)

my_udtf = SplitTableFunction()

t_env.register_function('split', my_udtf)

my_udaf = CountAggregateFunction()

t_env.register_function('my_count', my_udaf)

t.join_lateral("split(a) as (word)").select("trim(word) as word")\

  .group_by("word").select("word, my_count(word)")\

  .insert_into("Results")

t_env.execute()

with open(tmp_csv, 'r') as f:

  lines = f.read()

  print (lines)

Job Submission

Flink Run

Support for submitting Python Table API job in CliFrontend,And using `flink run` submit Python Table API job. The current `flink` command command line syntax is as follows:

...

Regarding the [ARGUMENTS] section of syntax, we reuse existing features without any extensions.

Python Shell

The main goal of Flink Python Shell is to provide an interactive way for users to write and execute flink Python Table API jobs.

...

...

>>>t_config = TableConfig.builder().as_streaming_execution().set_parallelism(1).build()

>>>t_env = TableEnvironment.create(t_config)

>>>data = [(1L, "Sunny"), (2L, "Eden")]

>>>source = t_env.from_collection(data, "id, name")

>>>source.select("id, name").insertInto(“print”)

>>>t_env.execute()

1,Sunny

2,Eden

REST API

Currently we have a complete set of REST APIs that can be used to submit Java jobs(https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/rest_api.html). We need to make some changes to the current REST API to support submitting Python Table API jobs:

...

We can firstly deprecated the old REST APIs and remove them in one or two releases.

Proposed Changes

This FLIP proposes are Flink Table's new support for the Python language. We will add a Python-related module, Introduces a set of Python Table API which should mirror Java / Scala Table API, and Job Submission, etc. And we will add checks for all of the changes.

Compatibility, Deprecation, and Migration Plan

This FLIP proposes new functionality and operators for the Python Table API. The behavior of existing operators is not modified.

Test Plan

This FLIP proposes can check by both It test case and validate the test case.

Implementation plan

  1. Add the flink-python module and a submodule flink-python-table to Py4j dependency configuration and Scan, Projection, and Filter operator of the Python Table API, and can be run in IDE(with simple test).
  2. Add a basic test framework, just like the existing Java TableAPI, abstract some TestBase.
  3. Add simplicity support for submitting Python Table API job in CliFrontend, i.e. `flink run -py wordcount.py` can be work(with simple test).
  4. Add all the TypeInformation support in Python Table API.
  5. Add all of the existing interfaces in Java Table API such as Table, TableEnvironment, TableConfig, and Descriptor etc.
  6. Add all the Source/Sink Descriptor support in Python Table API.
  7. Add Pip support, Flink Python Table API can be installed in python repo.
  8. Add Python Table API doc in current TableAPI doc
  9. Add Common concepts doc for Python Table API, in Basic concepts doc
  10. Add pythondocs at the same level as javadocs and scaladocs
  11. Add convenience interfaces to Python TableEnvironment, such as `from_collection`, `set_parallelism`, etc. which from `StreamExecutionEnvironment`.
  12. Add the description of `flink run -py xx.py` in CLI
  13. Add a Python shell, expose the TableEnvironment to the user, and automatically import the common dependencies.
  14. Add a Python REPL submenu under the Deployment & Operations directory to add documentation for the python shell.
  15. Add Support single job contains multiple python scripts, i.e. add `-pyfs --py-files <py-files>` (Attach custom python files for job. Use ',' to separate multiple files) for flink run `.  
  16. Other JIRAs that are constantly improving and enhanced, such as Catalog, DDL, Expression DSL, etc.
  1. ScalarFunction support
    1. Python ScalarFunction difinition
    2. Python ScalarFunction translated to Java ScalarFunction
    3. data service support
    4. optimizations for multiple scalar functions in the same operator
  2. TableFunction support
    1. Python TableFunction difinition
  3. AggregateFunction support
    1. Python AggregateFunction definition
    2. Python AggregateFunction translated to Java AggregateFunction
    3. DataView support
    4. state service support
    5. optimizations for multiple AggregateFunctions in the same operator
  4. Pandas API (fromPandas/toPandas) support
  5. Pandas scalar function support
  6. Pandas aggregate function support
  7. Pluggable execution environment support
  8. Logging service support
  9. Metrics service support
  10. etc...

Future or next step

Batch Execution Optimization, such user-defined scalar function:

...

Batch Execution Optimization is not only needed by scalar function but also can be applied on table function and aggregate function.

Rejected Alternatives

Regarding the API, we can adopt a scheme similar to the Beam community, that is, we may need to create a Python API that follows the same structure as Flink's Table API that produces the language-independent DAG. This approach needs to do a lot of cooperation with the beam community and it as a solution for the long-term goals.

...