Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

class TableConfig(object):

   def     def __init__(self, j_table_config=None):

           self._j_table_config = j_table_config

   class Builder(object):

            gateway = get_gateway()

            if        def __init__(self, j_table_config _builder)is None:

           self                  self._j_table_config _builder = j_table_config_builder

       def set_time_zone(self, time_zone):

= gateway.jvm.TableConfig()

            else:

                 self            self._j_table_config _builder.setTimeZone(time_zone)

           return self

       def set_null_check(self, null_check):

= j_table_config

  def get_local_timezone(self):

      return self            self._j_table_config_builder.setNullCheckgetLocalTimeZone(null_check)

           return self

       …

       …

       # We don't want to expose StreamExecutionEnvironment

       def as_streaming_execution(self):

           self._j_table_config_builder.asStreamingExecution()

           return self

       # We don't want to expose ExecutionEnvironment

       def as_batch_execution(self):

           self._j_table_config_builder.asBatchExecution()

           return self

       def build(self):

).getId()


  def set_local_timezone(self, timezone_id):

   ...

def get_configuration(self):
       return Configuration(j_configuration=            return TableConfig(self._j_table_config_builder.buildgetConfiguration())

   @classmethod

   def builder(cls def add_configuration(self, configuration):  
      self._j_table_config.addConfiguration(configuration._j_configuration        return cls.Builder(_jvm.org.apache.flink.table.api.TableConfig.builder())

TableEnvironment

class TableEnvironment(object):

  """

  Wrap and extend for org.apache.flink.table.api.TableEnvironment

  """

  table_config = None

  def __init__(self, j_tenv):

      self._j_tenv = j_tenv

  def register_table(self, name, table):

      self._j_tenv.registerTable(name, table._j_table)

  def register_table_source(self, name, table_source):

      self._j_tenv.registerTableSource(name, table_source.j_table_source)

  def register_table_sink(self, name, table_sink):

      self._j_tenv.registerTableSink(name, table_sink.j_table_sink)

  def scan(self, *table_path):

      j_paths = TypesUtil.convert_py_list_to_j_array("java.lang.String", table_path)

      j_table = self._j_tenv.scan(j_paths)

      return Table(j_table)

  def connect(self, connector_descriptor):

      return TableDescriptor(self._j_tenv.connect(connector_descriptor._j_connector_descriptor))

   …

   …

  def sql_query(self, query):

      j_table = self._j_tenv.sqlQuery(query)

      return Table(j_table)

  def sql_update(self, stmt, config=None):

      if config is not None:

          j_table = self._j_tenv.sqlUpdate(stmt, config)

      else:

          j_table = self._j_tenv.sqlUpdate(stmt)

  # Extension methods

  def from_collection(self, data):

      …

      return Table(...)


  def execute(self):

      self._j_tenv.execEnv().execute()

  def set_parallelism(self, parallelism):

      self._j_tenv.execEnv().setParallelism(parallelism)

  …

  ...

  @classmethod

  def create(cls, table_config):

      j_tenv = ...

      return TableEnvironment(j_tenv)

...

   └── pyflink.zip

└── ...

The flink-python -table module will be packaged as pyflink.zip, And put it in to opt/python/lib directory with PY4J_LICENSE.txt py4j-xxx-src.zip.

...