Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

class Table(object):

   """

   Wrapper of org.apache.flink.table.api.Table

   """

   def __init__(self, j_table):

       self._j_table = j_table  # The references to table object on JVM.

   def select(self, col_list):

       return Table(self._j_table.select(col_list))  # Call the java table api method directly

   def where(self, condition):

       return Table(self._j_table.where(condition))

   …

   …

   def write_to_sink(self, j_sink):

       self._j_table.writeToSink(j_sink._j_table_sink)

   def to_pandas(self):

       …

   return pandas.DataFrame(..)

    …

   ....

GroupedTable

class GroupedTable(object):

  """

  Wrapper of org.apache.flink.table.api.GroupedTable

  """

  def __init__(self, j_grouped_table):

      # The references to group table object on JVM.

      self._j_grouped_table = j_grouped_table

  def select(self, col_list):  

      # Call the java table api method directly

      return Table(self._j_grouped_table.select(col_list))

...

class TableEnvironment(object):

  """

  Wrap and extend for org.apache.flink.table.api.TableEnvironment

  """

  table_config = None

  def __init__(self, j_tenv):

      self._j_tenv = j_tenv

  def register_table(self, name, table):

      self._j_tenv.registerTable(name, table._j_table)

  def register_table_source(self, name, table_source):

      self._j_tenv.registerTableSource(name, table_source.j_table_source)

  def register_table_sink(self, name, table_sink):

      self._j_tenv.registerTableSink(name, table_sink.j_table_sink)

  def scan(self, *table_path):

      j_paths = TypesUtil.convert_py_list_to_j_array("java.lang.String", table_path)

      j_table = self._j_tenv.scan(j_paths)

      return Table(j_table)

  def connect(self, connector_descriptor):

      return TableDescriptor(self._j_tenv.connect(connector_descriptor._j_connector_descriptor))

   …

   …

  def sql_query(self, query):

      j_table = self._j_tenv.sqlQuery(query)

      return Table(j_table)

  def sql_update(self, stmt, config=None):

      if config is not None:

          j_table = self._j_tenv.sqlUpdate(stmt, config)

      else:

          j_table = self._j_tenv.sqlUpdate(stmt)

  # Extension methods

  def from_collection(self, data):

      …

      return Table(...)

  def from_pandas(self, data, schema=None):

      …

     return Table(...)


  def execute(self):

      self._j_tenv.execEnv().execute()

  def set_parallelism(self, parallelism):

      self._j_tenv.execEnv().setParallelism(parallelism)

  …

  ...

  @classmethod

  def create(cls, table_config):

      j_tenv = ...

      return TableEnvironment(j_tenv)

...