Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion thread

...


Vote thread
JIRA

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-12308

...

Release1.10


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

class Table(object):

   """

   Wrapper of org.apache.flink.table.api.Table

   """

   def __init__(self, j_table):

       self._j_table = j_table  # The references to table object on JVM.

   def select(self, col_list):

       return Table(self._j_table.select(col_list))  # Call the java table api method directly

   def where(self, condition):

       return Table(self._j_table.where(condition))

   …

   …

   def write_to_sink(self, j_sink):

       self._j_table.writeToSink(j_sink._j_table_sink)

   def to_pandas(self):

       …

   return pandas.DataFrame(..)

    …

   ....

GroupedTable

...

class TableConfig(object):

   def     def __init__(self, j_table_config=None):

            gateway = get_gateway()

            if            self._j_table_config = j_table_config

   class Builder(object):

is None:

                  self._        def __init__(self, j_table_config _builder):= gateway.jvm.TableConfig()

            else:

                 self            self._j_table_config _builder = j_table_config_builder

       def set   def get_timelocal_zonetimezone(self, time_zone):

           self      return self._j_table_config_builder.setTimeZone(time_zone)

           return self

       def set_null_check(self, null_check):

.getLocalTimeZone().getId()


  def set_local_timezone(self, timezone_id):

   ...

def get_configuration(self):
       return Configuration(j_configuration=self            self._j_table_config_builder.setNullCheckgetConfiguration(null_check)

           return self

       …

       …

       # We don't want to expose StreamExecutionEnvironment

       def as_streaming_execution(self):

))

def add_configuration(self, configuration):
      self            self._j_table_config_builder.asStreamingExecution(addConfiguration(configuration._j_configuration)

           return self

       # We don't want to expose ExecutionEnvironment

       def as_batch_execution(self):

           self._j_table_config_builder.asBatchExecution()

           return self

       def build(self):

           return TableConfig(self._j_table_config_builder.build())

   @classmethod

   def builder(cls):  

       return cls.Builder(_jvm.org.apache.flink.table.api.TableConfig.builder())

TableEnvironment

TableEnvironment

class TableEnvironment(object):

  """

  Wrap and extend for org.apache.flink.table.api.TableEnvironment

  """

  table_config = None

  def __init__(self, j_tenv):

      self._j_tenv = j_tenv

  def register_table(self, name, table):

      self._j_tenv.registerTable(name, table._j_table)

  def register_table_source(self, name, table_source):

      self._j_tenv.registerTableSource(name, table_source.j_table_source)

  def register_table_sink(self, name, table_sink

class TableEnvironment(object):

  """

  Wrap and extend for org.apache.flink.table.api.TableEnvironment

  """

  table_config = None

  def __init__(self, j_tenv):

      self._j_tenv = j_tenv

  def register_table(self, name, table):

      self._j_tenv.registerTableregisterTableSink(name, table_sink._j_table_sink)

  def register_table_source(scan(self, name, *table_sourcepath):

      self._j_tenv.registerTableSource(name, table_source.j_table_source)

  def register_table_sink(self, name, table_sink):

      j_paths = TypesUtil.convert_py_list_to_j_array("java.lang.String", table_path)

      j_table = self._j_tenv.scan(j_paths)

      return Table(j_table)

  def connect(self, connector_descriptor):

      return TableDescriptor(self       self._j_tenv.registerTableSinkconnect(name, tableconnector_sinkdescriptor._j_tableconnector_sinkdescriptor))

   …

   …

  def sql_query   def scan(self, *table_path):       j_paths = TypesUtil.convert_py_list_to_j_array("java.lang.String", table_path)query):

      j_table = self._j_tenv.scansqlQuery(j_pathsquery)

      return Table(j_table)

  def connectsql_update(self, connector_descriptorstmt, config=None):

      if config is not None:

          j_table =       return TableDescriptor( self._j_tenv.connect(connector_descriptor._j_connector_descriptor))

   …

   …

  def sql_query(self, query):

sqlUpdate(stmt, config)

      else:

          j       j_table = self._j_tenv.sqlQuerysqlUpdate(querystmt)

      return Table(j_table)

  # Extension methods

  def from_collection   def sql_update(self, stmt, config=Nonedata):

      if config is not None:

          j_table = self._j_tenv.sqlUpdate(stmt, config)

      else:

      …

      return Table(...)


  def execute(self):

      self           j_table = self._j_tenv.sqlUpdateexecEnv().execute(stmt)

  # Extension methods

  def fromset_collectionparallelism(self, dataparallelism):

      …

      return Table(...)

  def from_pandas(self, data, schema=None):

      …

     return Table(...)

  def execute(self):

      self       self._j_tenv.execEnv().execute()

  def set_parallelism(self, parallelism):

      self._j_tenv.execEnv().setParallelism(parallelism)

  …

  ...

  @classmethod

  def create(cls, table_config):

      j_tenv = ...

      return TableEnvironment(j_tenv)

...

   └── pyflink.zip

└── ...

The flink-python -table module module will be packaged as pyflink.zip, And put it in to opt/python/lib directory with PY4J_LICENSE.txt py4j-xxx-src.zip.

...

Let's take a word count as an example to provide a complete example. The Python Table API will look like the following (wordCount.py) :

from pyflink... import TrimFunction, CountAggregateFunction, SplitTableFunction

tmp_dir = tempfile.gettempdir()

source_path = tmp_dir + '/streaming.csv'

if os.path.isfile(source_path):

  os.remove(source_path)

with open(source_path, 'w') as f:

  lines = 'hi guys\n' + 'hello guys\n'

  f.write(lines)

  print (lines)

  f.close()

t_config = TableConfig.Builder().as_streaming_execution().set_parallelism(1).build()

t_env = TableEnvironment.get_table_environment(t_config)

field_names = ["a"]

field_types = [DataTypes.STRING]

# register Source table in table environment

t_env.register_table_source(

  "Source",

  CsvTableSource(source_path, field_names, field_types))

# register Results table in table environment

# using connector API

schema = Schema() \

  .field("a", DataTypes.STRING) \

  .field("b", DataTypes.INT)

format = Csv() \

  .field_delimiter(",") \

  .field("a", DataTypes.STRING) \

  .field("b", DataTypes.INT)

dataset import ExecutionEnvironment
from pyflink.table import BatchTableEnvironment, TableConfig
...

content = "..."

t_config = TableConfig()
env = ExecutionEnvironment.get_execution_environment()
t_env = BatchTableEnvironment.create(env, t_config)

# register Results table in table environment
tmp_dir = tempfile.gettempdir()
tmpresult_csv path = tmp_dir + '/streaming2.csv'result'
if os.path.exists(result_path):
try:
    if os.path.isfile(tmp_csv):

  os.remove(tmp_csv)

connector = FileSystem() \

  .path(tmp_csvresult_path):
        os.remove(result_path)
    else:
        shutil.rmtree(result_path)
except OSError as e:
    logging.error("Error removing directory: %s - %s.", e.filename, e.strerror)

t_env.connect(connectorFileSystem() \   .with_format(format.path(result_path)) \
      .with_schemaformat(OldCsv(schema) \
      .infield_append_mode() \

  .register_table_sink("Results")

t = t_env.scan("Source")

my_udf = TrimFunction()

t_env.register_function('trim', my_udf)

my_udtf = SplitTableFunction()

t_env.register_function('split', my_udtf)

my_udaf = CountAggregateFunction()

t_env.register_function('my_count', my_udaf)

t.join_lateral("split(a) as (word)").select("trim(word) as word")\

delimiter(',')
    .field("word", DataTypes.STRING())
    .field("count", DataTypes.BIGINT())) \
    .with_schema(Schema()
    .field("word", DataTypes.STRING())
    .field("count", DataTypes.BIGINT())) \
    .register_table_sink("Results")

elements = [(word, 1) for word in content.split(" ")]
t_env.from_elements(elements, ["word", "count"]) \
       .group_by("word") \
    .select("word, my_ count(word1) as count") \
      .insert_into("Results")

t_env.execute()

with open(tmp_csv, 'r') as f:

  lines = f.read()

"word_count")

...   print (lines)

Job Submission

Flink Run

Support for submitting Python Table API job in CliFrontend,And using `flink run` submit Python Table API job. The current `flink` command command line syntax is as follows:

...

The initialization procedure will import all the Flink Python Table API classes which should be exposed to the user.

Example:

...

>>>t>>>exec_config env = ExecutionEnvironment.get_execution_environment()

>>>exec_env TableConfig.builder().as_streaming_execution().set_parallelism(1).build

>>>t_config = TableConfig()

>>>t_env = TableEnvironmentBatchTableEnvironment.create(t_config)

>>>data = [(1L, "Sunny"), (2L, "Eden")]

>>>source = t_env.from_collection(data, "id, name")

>>>source.select("id, name").insertInto(“print”)

>>>t_env.execute()

1,Sunny

2,Eden

...