Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion thread

...


Vote thread
JIRA

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-12308

...

Release1.10


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

class TableConfig(object):

   def     def __init__(self, j_table_config=None):

            gateway = get_gateway()

            if            self._ j_table_config = j_table_config

   class Builder(object):

is None:

                  self._        def __init__(self, j_table_config _builder):= gateway.jvm.TableConfig()

            else:

                 self            self._j_table_config _builder = j_table_config_builder

       def set   def get_timelocal_zonetimezone(self, time_zone):

           self      return self._j_table_config_builder.setTimeZone(time_zone)

           return self

       def set_null_check(self, null_check):

           self._j_table_config_builder.setNullCheck(null_check)

           return self

       …

       …

       # We don't want to expose StreamExecutionEnvironment

       def as_streaming_execution(self):

           self._j_table_config_builder.asStreamingExecution()

           return self

       # We don't want to expose ExecutionEnvironment

       def as_batch_execution(self):

           self._j_table_config_builder.asBatchExecution()

           return self

       def build(self):

.getLocalTimeZone().getId()


  def set_local_timezone(self, timezone_id):

   ...

def get_configuration(self):
       return Configuration(j_configuration=            return TableConfig(self._j_table_config_builder.buildgetConfiguration())

   @classmethod

   def builder(cls def add_configuration(self, configuration):  
      self._j_table_config.addConfiguration(configuration._j_configuration        return cls.Builder(_jvm.org.apache.flink.table.api.TableConfig.builder())

TableEnvironment

class TableEnvironment(object):

  """

  Wrap and extend for org.apache.flink.table.api.TableEnvironment

  """

  table_config = None

  def __init__(self, j_tenv):

      self._j_tenv = j_tenv

  def register_table(self, name, table):

      self._j_tenv.registerTable(name, table._j_table)

  def register_table_source(self, name, table_source):

      self._j_tenv.registerTableSource(name, table_source.j_table_source)

  def register_table_sink(self, name, table_sink):

      self._j_tenv.registerTableSink(name, table_sink.j_table_sink)

  def scan(self, *table_path):

      j_paths = TypesUtil.convert_py_list_to_j_array("java.lang.String", table_path)

      j_table = self._j_tenv.scan(j_paths)

      return Table(j_table)

  def connect(self, connector_descriptor):

      return TableDescriptor(self._j_tenv.connect(connector_descriptor._j_connector_descriptor))

   …

   …

  def sql_query(self, query):

      j_table = self._j_tenv.sqlQuery(query)

      return Table(j_table)

  def sql_update(self, stmt, config=None):

      if config is not None:

          j_table = self._j_tenv.sqlUpdate(stmt, config)

      else:

          j_table = self._j_tenv.sqlUpdate(stmt)

  # Extension methods

  def from_collection(self, data):

      …

      return Table(...)


  def execute(self):

      self._j_tenv.execEnv().execute()

  def set_parallelism(self, parallelism):

      self._j_tenv.execEnv().setParallelism(parallelism)

  …

  ...

  @classmethod

  def create(cls, table_config):

      j_tenv = ...

      return TableEnvironment(j_tenv)

...

   └── pyflink.zip

└── ...

The flink-python -table module will be packaged as pyflink.zip, And put it in to opt/python/lib directory with PY4J_LICENSE.txt py4j-xxx-src.zip.

...

Let's take a word count as an example to provide a complete example. The Python Table API will look like the following (word_count.py) :

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import BatchTableEnvironment, TableConfig
...

content = "..."

t_config = TableConfig()
env = ExecutionEnvironment.get_execution_environment()
t_env = BatchTableEnvironment.create(env, t_config)

# register Results table in table environment
tmp_dir = tempfile.gettempdir()
result_path = tmp_dir + '/result'
if os.path.exists(result_path):
try:
    if os.path.isfile(result_path):
        os.remove(result_path)
    else:
        shutil.rmtree(result_path)
except OSError as e:
    logging.error("Error removing directory: %s - %s.", e.filename, e.strerror)

t_env.connect(FileSystem().path(result_path)) \
    .with_format(OldCsv()
    .field_delimiter(',')
    .field("word", DataTypes.STRING())
    .field("count", DataTypes.BIGINT())) \
    .with_schema(Schema()
    .field("word", DataTypes.STRING())
    .field("count", DataTypes.BIGINT())) \
    .register_table_sink("Results")

elements = [(word, 1) for word in content.split(" ")]
t_env.from_elements(elements, ["word", "count"]) \
    .group_by("word") \
    .select("word, count(1) as count") \
    .insert_into("Results")

t_env.execute("word_count")

...

...