Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion thread

...


Vote thread
JIRA

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-12308

...

Release1.10


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

class TableConfig(object):

   def     def __init__(self, j_table_config=None):

            gateway = get_gateway()

            if            self._ j_table_config = j_table_config

   class Builder(object):

is None:

                  self._        def __init__(self, j_table_config _builder):= gateway.jvm.TableConfig()

            else:

                 self            self._j_table_config _builder = j_table_config_builder

       def set   def get_timelocal_zonetimezone(self, time_zone):

           self      return self._j_table_config_builder.setTimeZone(time_zone)

           return self

       def set_null_check(self, null_check):

getLocalTimeZone().getId()


  def set_local_timezone(self, timezone_id):

   ...

def get_configuration(self):
       return Configuration(j_configuration=self            self._j_table_config_builder.setNullCheckgetConfiguration(null_check))

           return self

       …

       …

       # We don't want to expose StreamExecutionEnvironment

       def as_streaming_execution(self):

           self def add_configuration(self, configuration):
      self._j_table_config_builder.asStreamingExecution(addConfiguration(configuration._j_configuration)

           return self

       # We don't want to expose ExecutionEnvironment

       def as_batch_execution(self):

           self._j_table_config_builder.asBatchExecution()

           return self

       def build(self):

           return TableConfig(self._j_table_config_builder.build())

   @classmethod

   def builder(cls):  

       return cls.Builder(_jvm.org.apache.flink.table.api.TableConfig.builder())

TableEnvironment

TableEnvironment

class TableEnvironment(object):

  """

  Wrap and extend for org.apache.flink.table.api.TableEnvironment

  """

  table_config = None

  def __init__(self, j_tenv):

      self._j_tenv = j_tenv

  def register_table(self, name, table):

      self._j_tenv.registerTable(name, table._j_table)

  def register_table_source(self, name, table_source):

      self._j_tenv.registerTableSource(name, table_source.j_table_source)

  def register_table_sink(self, name, table_sink):

      self._j_tenv.registerTableSink(name, table_sink.j_table_sink)

  def scan(self, *table_path):

      j_paths = TypesUtil.convert_py_list_to_j_array("java.lang.String", table_path)

      j_table = self._j_tenv.scan(j_paths)

      return Table(j_table)

  def connect(self, connector_descriptor):

      return TableDescriptor(self._j_tenv.connect(connector_descriptor._j_connector_descriptor))

   …

   …

  def sql_query(self, query):

      j_table = self._j_tenv.sqlQuery(query)

      return Table(j_table)

  def sql_update(self, stmt, config=None):

      if config is not None:

          j_table = self._j_tenv.sqlUpdate(stmt, config)

      else:

          j

class TableEnvironment(object):

  """

  Wrap and extend for org.apache.flink.table.api.TableEnvironment

  """

  table_config = None

  def __init__(self, j_tenv):

      self._j_tenv = j_tenv

  def register_table(self, name, table):

      self._j_tenv.registerTable(name, table._j_table)

  def register_table_source(self, name, table_source):

      self._j_tenv.registerTableSource(name, table_source.j_table_source)

  def register_table_sink(self, name, table_sink):

      self._j_tenv.registerTableSink(name, table_sink.j_table_sink)

  def scan(self, *table_path):

      j_paths = TypesUtil.convert_py_list_to_j_array("java.lang.String", table_path)

      j_table = self._j_tenv.scan(j_paths)

      return Table(j_table)

  def connect(self, connector_descriptor):

sqlUpdate(stmt)

  # Extension methods

  def from_collection(self, data):

      …

      return Table(...)


  def execute(self):

      self       return TableDescriptor(self._j_tenv.connectexecEnv(connector_descriptor._j_connector_descriptor))

   …

   …

).execute()

  def set_parallelism   def sql_query(self, queryparallelism):

      j_table = self       self._j_tenv.sqlQueryexecEnv(query)

      return Table(j_table)

  def sql_update(self, stmt, config=None):

      if config is not None:

          j_table = self._j_tenv.sqlUpdate(stmt, config)

      else:

          j_table = self._j_tenv.sqlUpdate(stmt)

  # Extension methods

  def from_collection(self, data):

      …

      return Table(...)

.setParallelism(parallelism)

  …

  ...

  @classmethod

  def create(cls, table_config):

      j_tenv = ...

      return TableEnvironment(j_tenv)

Descriptor

There are a lot of Descriptor related classes, we will take Csv as an example:

class Csv(FormatDescriptor):

  def __init__(self):

      self._j_csv = _jvm.org.apache.flink.table.descriptors.Csv()

      super(Csv, self).__init__(self._j_csv)

  def field_delimiter(self, delimiter   def execute(self):

      self._j_tenvcsv.execEnvfieldDelimiter().execute()delimiter)

      return self

  def setline_parallelismdelimiter(self, parallelismdelimiter):

      self._j_tenvcsv.execEnvlineDelimiter().setParallelism(parallelism)

  …

  ...

  @classmethod

  def create(cls, table_config):

      j_tenv = ...

      return TableEnvironment(j_tenv)

Descriptor

There are a lot of Descriptor related classes, we will take Csv as an example:

delimiter)

      return self

  def quote_character(self, quoteCharacter):

      self._j_csv.quoteCharacter(quoteCharacter)

      return self

  def allow_comments(self):

      self._j_csv.allowComments()

      return self

  def ignore_parse_errors(self):

      self._j_csv.ignoreParseErrors()

      return self

  def array_element_delimiter(self, delimiter

class Csv(FormatDescriptor):

  def __init__(self):

      self._j_csv = _jvm.org.apache.flink.table.descriptors.Csv()

      super(Csv, self).__init__(self._j_csv)

.arrayElementDelimiter(delimiter)

      return self

  def escape_character(self, escape_character   def field_delimiter(self, delimiter):

      self._j_csv.fieldDelimiterescapeCharacter(delimiterescape_character)

      return self

  def linenull_delimiterliteral(self, delimiternull_literal):

      self._j_csv.lineDelimiter(delimiter)

      return self

  def quote_character(self, quoteCharacter):

      self._j_csv.quoteCharacter(quoteCharacter)

      return self

  def allow_comments(self):

      self._j_csv.allowComments()

      return self

  def ignore_parse_errors(self):

      self._j_csv.ignoreParseErrors()

      return self

  def array_element_delimiter(self, delimiter):

      self._j_csv.arrayElementDelimiter(delimiter)

      return self

  def escape_character(self, escape_character):

      self._j_csv.escapeCharacter(escape_character)

      return self

  def null_literal(self, null_literal):

      self._j_csv.nullLiteral(null_literal)

      return self

Expression

Expression API will be not supported in the initial version as there is an ongoing work of introducing Table API Java Expression DSL. Python Expression API will leverage that and supported once that work is done.

DIST

Create a python directory in the published opt directory. The contents of python are as follows:

opt/python

├── README.MD

└── lib

   ├── py4j-x.y.z-src.zip

   ├── py4j-LICENSE.txt

   └── pyflink.zip

└── ...

The flink-python-table module will be packaged as pyflink.zip, And put it in to opt/python/lib directory with PY4J_LICENSE.txt py4j-xxx-src.zip.

  • And the pyflink shell we be added in published bin directory.
  • The shell of `flink` should add some options for Python Table API, such as:
    • -py --python
    • -pyfs --py-files
    • etc ...

    The detail can be found in the Job Submission section.

Docs

nullLiteral(null_literal)

      return self

Expression

Expression API will be not supported in the initial version as there is an ongoing work of introducing Table API Java Expression DSL. Python Expression API will leverage that and supported once that work is done.

DIST

Create a python directory in the published opt directory. The contents of python are as follows:

opt/python

├── README.MD

└── lib

   ├── py4j-x.y.z-src.zip

   ├── py4j-LICENSE.txt

   └── pyflink.zip

└── ...

The flink-python module will be packaged as pyflink.zip, And put it in to opt/python/lib directory with PY4J_LICENSE.txt py4j-xxx-src.zip.

  • And the pyflink shell we be added in published bin directory.
  • The shell of `flink` should add some options for Python Table API, such as:
    • -py --python
    • -pyfs --py-files
    • etc ...

    The detail can be found in the Job Submission section.

Docs

  • Add the description of `flink run -py xx.py` in CLI
  • Add rest service API
  • Add the description of `flink run -py xx.py` in CLI
  • Add rest service API for submit job
  • Add a Python REPL submenu under the Deployment & Operations directory to add documentation for the python shell.
  • Add Python Table API doc in current TableAPI doc
  • Add Common concepts doc for Python Table API, in Basic concepts doc
  • Add pythondocs at the same level as javadocs and scaladocs
  • etc.

Examples

WordCount

Let's take a word count as an example to provide a complete example. The Python Table API will look like the following (wordCount.py):

  • etc.

Examples

WordCount

Let's take a word count as an example to provide a example. The Python Table API will look like the following :

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import BatchTableEnvironment, TableConfig
...

content = "..."

t_config = TableConfig()
env = ExecutionEnvironment.get_execution_environment()
t_env = BatchTableEnvironment.create(env, t_config)

# register Results table in table environment

from ... import TrimFunction, CountAggregateFunction, SplitTableFunction

tmp_dir = tempfile.gettempdir()

source_path = tmp_dir + '/streaming.csv'

if os.path.isfile(source_path):

  os.remove(source_path)

with open(source_path, 'w') as f:

  lines = 'hi guys\n' + 'hello guys\n'

  f.write(lines)

  print (lines)

  f.close()

t_config = TableConfig.Builder().as_streaming_execution().set_parallelism(1).build()

t_env = TableEnvironment.get_table_environment(t_config)

field_names = ["a"]

field_types = [DataTypes.STRING]

# register Source table in table environment

t_env.register_table_source(

  "Source",

  CsvTableSource(source_path, field_names, field_types))

# register Results table in table environment

# using connector API

schema = Schema() \

  .field("a", DataTypes.STRING) \

  .field("b", DataTypes.INT)

format = Csv() \

  .field_delimiter(",") \

  .field("a", DataTypes.STRING) \

  .field("b", DataTypes.INT)

tmp_dir = tempfile.gettempdir()
tmpresult_csv path = tmp_dir + '/streaming2.csv'result'
if os.path.exists(result_path):
try:
    if os.path.isfile(tmp_csv):

  os.remove(tmp_csv)

connector = FileSystem() \

  .path(tmp_csvresult_path):
        os.remove(result_path)
    else:
        shutil.rmtree(result_path)
except OSError as e:
    logging.error("Error removing directory: %s - %s.", e.filename, e.strerror)

t_env.connect(connectorFileSystem() \   .with_format(format.path(result_path)) \
      .with_schemaformat(OldCsv(schema) \
      .infield_append_mode() \

  .register_table_sink("Results")

t = t_env.scan("Source")

my_udf = TrimFunction()

t_env.register_function('trim', my_udf)

my_udtf = SplitTableFunction()

t_env.register_function('split', my_udtf)

my_udaf = CountAggregateFunction()

t_env.register_function('my_count', my_udaf)

t.join_lateral("split(a) as (word)").select("trim(word) as word")\

delimiter(',')
    .field("word", DataTypes.STRING())
    .field("count", DataTypes.BIGINT())) \
    .with_schema(Schema()
    .field("word", DataTypes.STRING())
    .field("count", DataTypes.BIGINT())) \
    .register_table_sink("Results")

elements = [(word, 1) for word in content.split(" ")]
t_env.from_elements(elements, ["word", "count"]) \
       .group_by("word") \
    .select("word, my_ count(word1) as count") \
      .insert_into("Results")

t_env.execute()

with open(tmp_csv, 'r') as f:

  lines = f.read()

"word_count")

...   print (lines)

Job Submission

Flink Run

Support for submitting Python Table API job in CliFrontend,And using `flink run` submit Python Table API job. The current `flink` command command line syntax is as follows:

...

The initialization procedure will import all the Flink Python Table API classes which should be exposed to the user.

Example:

...

>>>t>>>exec_config env = ExecutionEnvironment.get_execution_environment()

>>>exec_env TableConfig.builder().as_streaming_execution().set_parallelism(1).build

>>>t_config = TableConfig()

>>>t_env = TableEnvironmentBatchTableEnvironment.create(t_config)

>>>data = [(1L, "Sunny"), (2L, "Eden")]

>>>source = t_env.from_collection(data, "id, name")

>>>source.select("id, name").insertInto(“print”)

>>>t_env.execute()

1,Sunny

2,Eden

...