Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

There will be Python wrappers for all the API classes such as TableEnvironment, Table, TableSink, TableSource, Catalog, etc.  For example:

class Table(object):

   """

   Wrapper of org.apache.flink.table.api.Table

   """

   def __init__(self, j_table):

       self._j_table = j_table  # The references to table object on JVM.

   def select(self, col_list):

       return Table(self._j_table.select(col_list))  # Call the java table api method directly

...

In this approach Py4J encapsulates all serialization deserialization processes.

...

  • Table
  • TableEnvironment
  • TableConfig
  • Descriptor
  • TypeInformation(May change after FLIP-37)
  • etc

Table

class Table(object):

   """

   Wrapper of org.apache.flink.table.api.Table

   """

   def __init__(self, j_table):

       self._j_table = j_table  # The references to table object on JVM.

   def select(self, col_list):

       return Table(self._j_table.select(col_list))  # Call the java table api method directly

   def where(self, condition):

       return Table(self._j_table.where(condition))

   …

   …

   def write_to_sink(self, j_sink):

       self._j_table.writeToSink(j_sink._j_table_sink)

   def to_pandas(self):

       …

   return pandas.DataFrame(..)

    …

   ....

GroupedTable

class GroupedTable(object):

  """

  Wrapper of org.apache.flink.table.api.GroupedTable

  """

  def __init__(self, j_grouped_table):

      # The references to group table object on JVM.

      self._j_grouped_table = j_grouped_table

  def select(self, col_list):  

      # Call the java table api method directly

      return Table(self._j_grouped_table.select(col_list))

GroupWindowedTable, WindowGroupedTable, OverWindowedTable are defined similarly as GroupTable.

TableConfig

class TableConfig(object):

   def __init__(self, j_table_config):

           self._j_table_config = j_table_config

   class Builder(object):

       def __init__(self, j_table_config_builder):

           self._j_table_config_builder = j_table_config_builder

       def set_time_zone(self, time_zone):

           self._j_table_config_builder.setTimeZone(time_zone)

           return self

       def set_null_check(self, null_check):

           self._j_table_config_builder.setNullCheck(null_check)

           return self

       …

       …

       # We don't want to expose StreamExecutionEnvironment

       def as_streaming_execution(self):

           self._j_table_config_builder.asStreamingExecution()

           return self

       # We don't want to expose ExecutionEnvironment

       def as_batch_execution(self):

           self._j_table_config_builder.asBatchExecution()

           return self

       def build(self):

           return TableConfig(self._j_table_config_builder.build())

   @classmethod

   def builder(cls):  

       return cls.Builder(_jvm.org.apache.flink.table.api.TableConfig.builder())

TableEnvironment

class TableEnvironment(object):

  """

  Wrap and extend for org.apache.flink.table.api.TableEnvironment

  """

  table_config = None

  def __init__(self, j_tenv):

      self._j_tenv = j_tenv

  def register_table(self, name, table):

      self._j_tenv.registerTable(name, table._j_table)

  def register_table_source(self, name, table_source):

      self._j_tenv.registerTableSource(name, table_source.j_table_source)

  def register_table_sink(self, name, table_sink):

      self._j_tenv.registerTableSink(name, table_sink.j_table_sink)

  def scan(self, *table_path):

      j_paths = TypesUtil.convert_py_list_to_j_array("java.lang.String", table_path)

      j_table = self._j_tenv.scan(j_paths)

      return Table(j_table)

  def connect(self, connector_descriptor):

      return TableDescriptor(self._j_tenv.connect(connector_descriptor._j_connector_descriptor))

   …

   …

  def sql_query(self, query):

      j_table = self._j_tenv.sqlQuery(query)

      return Table(j_table)

  def sql_update(self, stmt, config=None):

      if config is not None:

          j_table = self._j_tenv.sqlUpdate(stmt, config)

      else:

          j_table = self._j_tenv.sqlUpdate(stmt)

  # Extension methods

  def from_collection(self, data):

      …

      return Table(...)

  def from_pandas(self, data, schema=None):

      …

     return Table(...)

  def execute(self):

      self._j_tenv.execEnv().execute()

  def set_parallelism(self, parallelism):

      self._j_tenv.execEnv().setParallelism(parallelism)

  …

  ...

  @classmethod

  def create(cls, table_config):

      j_tenv = ...

      return TableEnvironment(j_tenv)

Descriptor

There are a lot of Descriptor related classes, we will take Csv as an example:

class Csv(FormatDescriptor):

  def __init__(self):

      self._j_csv = _jvm.org.apache.flink.table.descriptors.Csv()

      super(Csv, self).__init__(self._j_csv)

  def field_delimiter(self, delimiter):

      self._j_csv.fieldDelimiter(delimiter)

      return self

  def line_delimiter(self, delimiter):

      self._j_csv.lineDelimiter(delimiter)

      return self

  def quote_character(self, quoteCharacter):

      self._j_csv.quoteCharacter(quoteCharacter)

      return self

  def allow_comments(self):

      self._j_csv.allowComments()

      return self

  def ignore_parse_errors(self):

      self._j_csv.ignoreParseErrors()

      return self

  def array_element_delimiter(self, delimiter):

      self._j_csv.arrayElementDelimiter(delimiter)

      return self

  def escape_character(self, escape_character):

      self._j_csv.escapeCharacter(escape_character)

      return self

  def null_literal(self, null_literal):

      self._j_csv.nullLiteral(null_literal)

      return self

UDFs Interface

UserDefinedFunction

class UserDefinedFunction(object):

  """

  An interface for writing python user defined function

  """

  __metaclass__ = ABCMeta

  def open(self, *args):

      pass

  def close(self, *args):

      pass

  def is_deterministic(self):

      return True

ScalarFunction

class ScalarFunction(UserDefinedFunction):

  @abstractmethod

  def eval(self, *args):

      """Please define your implementation"""

TableFunction

class TableFunction(UserDefinedFunction):

  @abstractmethod

  def eval(self, *args):

      """Please define your implementation"""

  def collect(self, element):

      pass

AggregationFunction

class AggregateFunction(UserDefinedFunction):

  """

  optional methods:

  def accumulate(self, accumulator, *args):

      pass

  def retract(self, accumulator, *args):

      pass

  def merge(self, accumulator, other_accumulators):

      pass

  """

  @abstractmethod

  def create_accumulator(self):

      """Please define your implementation"""

  @abstractmethod

  def get_value(self, accumulator):

      """Please define your implementation"""

Decorators

  • udf: used for ScalarFunction
  • udtf: used for TableFunction
  • udaf: used for AggregateFunction

...

NOTE: The decorators may change according to FLIP-37

DataView

class DataView(object):

  """

  An interface for defining DataView

  """

  __metaclass__ = ABCMeta

  @abstractmethod

  def clear(self):

      pass

MapView

class MapView(DataView):

  def __init__(self, key_type, value_type):

      self.key_type = key_type

      self.value_type = value_type

  def get(self, key):

      return _NoValue

  def put(self, key, value):

      pass

  def put_all(self, map):

      pass

  def remove(self, key):

      pass

  def contains(self, key):

      return _NoValue

  def entries(self):

      return _NoValue

  def keys(self):

      return _NoValue

  def values(self):

      return _NoValue

  def iterator(self):

      return _NoValue

ListView

class ListView(DataView):

  def __init__(self, element_type):

      self.element_type = element_type

  def get(self):

      return _NoValue

  def add(self, value):

      pass

  def add_all(self, values):

      pass

Expression

Expression API will be not supported in the initial version as there is an ongoing work of introducing Table API Java Expression DSL. Python Expression API will leverage that and supported once that work is done.

...

  • Scalar Function definition

@udf(input=DataTypes.STRING, output=DataTypes.STRING)

class TrimFunction(ScalarFunction):

  def eval(self, param):

      def trim(s):

          if len(s) == 0:

              return ''

          if s[:1] == ' ':

              return trim(s[1:])

          elif s[-1:] == ' ':

              return trim(s[:-1])

          else:

              return s

      return trim(param)


  • Usage

my_udf = TrimFunction()

t_env.register_function('trim', my_udf)

source = ...

result = source.select("trim(a) as a")

User-Defined Table Function

  • Table Function definition

@udtf(input=DataTypes.STRING, output=DataTypes.STRING)

class SplitTableFunction(TableFunction):

  def eval(self, names):

      for name in names.split(' '):

          self.collect(name)


  • Usage

my_udtf = SplitTableFunction()

t_env.register_function('split', my_udtf)

source = ...

source.join_lateral('split(a) as (word)').select("word").group_by("word").select("word, word.count  as word_count")

User-Defined Aggregate Function

  • Aggregate Function definition

@udaf(input=DataTypes.ANY, output=DataTypes.INT, accumulator=DataTypes.ROW(DataTypes.INT))

class CountAggregateFunction(AggregateFunction):

  def accumulate(self, acc, param):

      if param is not None:

          acc.set_value(acc.get_value(0) + 1)

  def merge(self, acc, other_acc):

      for item in other_acc:

         acc.set_value(0, acc.get_value(0) + item.get_value(0))

      return acc

  def retract(self, acc, param):

      if param is not None:

          acc.set_value(acc.get_value(0) - 1)

  def create_accumulator(self):

      return Row([0])

  def get_value(self, acc):

      return acc.get_value(0)

WordCount

Let's take a word count as an example to provide a complete example. The Python Table API will look like the following (wordCount.py):

from ... import TrimFunction, CountAggregateFunction, SplitTableFunction

tmp_dir = tempfile.gettempdir()

source_path = tmp_dir + '/streaming.csv'

if os.path.isfile(source_path):

  os.remove(source_path)

with open(source_path, 'w') as f:

  lines = 'hi guys\n' + 'hello guys\n'

  f.write(lines)

  print (lines)

  f.close()

t_config = TableConfig.Builder().as_streaming_execution().set_parallelism(1).build()

t_env = TableEnvironment.get_table_environment(t_config)

field_names = ["a"]

field_types = [DataTypes.STRING]

# register Source table in table environment

t_env.register_table_source(

  "Source",

  CsvTableSource(source_path, field_names, field_types))

# register Results table in table environment

# using connector API

schema = Schema() \

  .field("a", DataTypes.STRING) \

  .field("b", DataTypes.INT)

format = Csv() \

  .field_delimiter(",") \

  .field("a", DataTypes.STRING) \

  .field("b", DataTypes.INT)

tmp_dir = tempfile.gettempdir()

tmp_csv = tmp_dir + '/streaming2.csv'

if os.path.isfile(tmp_csv):

  os.remove(tmp_csv)

connector = FileSystem() \

  .path(tmp_csv)

t_env.connect(connector) \

  .with_format(format) \

  .with_schema(schema) \

  .in_append_mode() \

  .register_table_sink("Results")

t = t_env.scan("Source")

my_udf = TrimFunction()

t_env.register_function('trim', my_udf)

my_udtf = SplitTableFunction()

t_env.register_function('split', my_udtf)

my_udaf = CountAggregateFunction()

t_env.register_function('my_count', my_udaf)

t.join_lateral("split(a) as (word)").select("trim(word) as word")\

  .group_by("word").select("word, my_count(word)")\

  .insert_into("Results")

t_env.execute()

with open(tmp_csv, 'r') as f:

  lines = f.read()

  print (lines)

Job Submission

Flink Run

Support for submitting Python Table API job in CliFrontend,And using `flink run` submit Python Table API job. The current `flink` command command line syntax is as follows:

flink <ACTION> [OPTIONS] [ARGUMENTS]

On the basis of the current `run` ACTION, we add to Python Table API support, specific OPTIONS are as follows:

...

We can submit the Python Table API jobs as follows:

// Submit a simple job without any dependencies and parameters

FLINK_HOME/bin/flink run -py example.py

// Submit a job with dependencies and parallelism 16

FLINK_HOME/bin/flink run -p 16 -py example.py -pyfs resources1.egg, resources2.zip, resource3.py

// Submit a job by configuring python class

FLINK_HOME/bin/flink run -pyfs resources1.egg, resources2.zip, resource3.py -pym org.apache.pyflink.example

// Submit a job with binary path and execution environment config

FLINK_HOME/bin/flink run -pyfs resources1.egg, resources2.zip, resource3.py -py-exec ./my_env/env/bin/python -py-env process -pym org.apache.pyflink.example

// Sumit a simple job in which the Python worker runs in a docker container

FLINK_HOME/bin/flink run --py-env docker --py-docker-image xxx -py example.py

Regarding the [ARGUMENTS] section of syntax, we reuse existing features without any extensions.

...

The initialization procedure will import all the Flink Python Table API classes which should be exposed to the user.

Example:

...

>>>t_config = TableConfig.builder().as_streaming_execution().set_parallelism(1).build()

>>>t_env = TableEnvironment.create(t_config)

>>>data = [(1L, "Sunny"), (2L, "Eden")]

>>>source = t_env.from_collection(data, "id, name")

>>>source.select("id, name").insertInto(“print”)

>>>t_env.execute()

1,Sunny

2,Eden

REST API

Currently we have a complete set of REST APIs that can be used to submit Java jobs(https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/rest_api.html). We need to make some changes to the current REST API to support submitting Python Table API jobs:

Current REST API

New REST API

Remarks

GET /jars

GET /artifacts

Now it lists both jar files and python package files.

POST /jars/upload

POST /artifacts/upload

Allow users to upload both jar files and python package files. Use "Content-Type" in the request header to mark the file type.

DELETE /jars/:jarid

DELETE /artifacts/:artifactid

Delete files according to “artifactid”.

GET /jars/:jarid/plan

GET /artifacts/:artifactid/plan

We will introduce a new optional parameter:

pythonEntryModule.

It allows specifying a python entry point module other than __main__.py.

POST /jars/:jarid/run

POST /artifacts/:artifactid/run

We also introduce the optional pythonEntryModule parameter here, as same as above.

We can firstly deprecated the old REST APIs and remove them in one or two releases.

...

  1. Add the flink-python module and a submodule flink-python-table to Py4j dependency configuration and Scan, Projection, and Filter operator of the Python Table API, and can be run in IDE(with simple test).
  2. Add a basic test framework, just like the existing Java TableAPI, abstract some TestBase.
  3. Add integrated Tox for ensuring compatibility with the python2/3 version
  4. Add simplicity support for submitting Python Table API job in CliFrontend, i.e. `flink run -py wordcount.py` can be work(with simple test).
  5. Add all the TypeInformation support in Python Table API.
  6. Add all of the existing interfaces in Java Table API such as Table, TableEnvironment, TableConfig, and Descriptor etc.
  7. Add all the Source/Sink Descriptor support in Python Table API.
  8. Add Pip support, Flink Python Table API can be installed in python repo.
  9. Add Python Table API doc in current TableAPI doc
  10. Add Common concepts doc for Python Table API, in Basic concepts doc
  11. Add pythondocs at the same level as javadocs and scaladocs
  12. Add convenience interfaces to Python TableEnvironment, such as `from_collection`, `set_parallelism`, etc. which from `StreamExecutionEnvironment`.
  13. Add the description of `flink run -py xx.py` in CLI
  14. Add a Python shell, expose the TableEnvironment to the user, and automatically import the common dependencies.
  15. Add a Python REPL submenu under the Deployment & Operations directory to add documentation for the python shell.
  16. Add Support single job contains multiple python scripts, i.e. add `-pyfs --py-files <py-files>` (Attach custom python files for job. Use ',' to separate multiple files) for flink run `.  
  17. Other JIRAs that are constantly improving and enhanced, such as Catalog, DDL, Expression DSL, etc.

...

Suppose we have a job as following:

# udf_1 and udf_2 are Python UDF and udf_3 is Java UDF

t_env.register_function(“udf_1”, PythonScalarFunction1())

t_env.register_function(“udf_2”, PythonScalarFunction2())

t_env.register_function(“udf_3”, JavaScalarFunction3())

t = t_env.scan("Orders")  # schema (a, b, c, d)

t.select(“a, udf_1(a, b), udf_2(b, c), udf_3(c, d)”)


The two Python UDFs “udf_1” and “udf_2” will be optimized to run in the same Python worker.

The physical plan will look like follows:

DataStreamCalc(select=[a, udf_1(a, b) AS EXPR$2, udf_2(b, c) AS EXPR$3, EXPR$1])

 +- DataStreamCalc(select=[a, b, c, udf_3(c, d) AS EXPR$1])

   +- DataStreamScan(table=[[ _DataStreamTable_0]])


This optimization applies for both common Python scalar function and pandas function.

...