Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Let's take a word count as an example to provide a complete example. The Python Table API will look like the following (wordCountword_count.py):

from pyflink... import TrimFunction, CountAggregateFunction, SplitTableFunction

tmp_dir = tempfile.gettempdir()

source_path = tmp_dir + '/streaming.csv'

if os.path.isfile(source_path):

  os.remove(source_path)

with open(source_path, 'w') as f:

  lines = 'hi guys\n' + 'hello guys\n'

  f.write(lines)

  print (lines)

  f.close()

t_config = TableConfig.Builder().as_streaming_execution().set_parallelism(1).build()

t_env = TableEnvironment.get_table_environment(t_config)

field_names = ["a"]

field_types = [DataTypes.STRING]

# register Source table in table environment

t_env.register_table_source(

  "Source",

  CsvTableSource(source_path, field_names, field_types))

# register Results table in table environment

# using connector API

schema = Schema() \

  .field("a", DataTypes.STRING) \

  .field("b", DataTypes.INT)

format = Csv() \

  .field_delimiter(",") \

  .field("a", DataTypes.STRING) \

  .field("b", DataTypes.INT)

dataset import ExecutionEnvironment
from pyflink.table import BatchTableEnvironment, TableConfig
...

content = "..."

t_config = TableConfig()
env = ExecutionEnvironment.get_execution_environment()
t_env = BatchTableEnvironment.create(env, t_config)

# register Results table in table environment
tmp_dir = tempfile.gettempdir()
tmpresult_csv path = tmp_dir + '/streaming2.csv'result'
if os.path.exists(result_path):
try:
    if os.path.isfile(tmp_csv):

  os.remove(tmp_csv)

connector = FileSystem() \

  .path(tmp_csvresult_path):
        os.remove(result_path)
    else:
        shutil.rmtree(result_path)
except OSError as e:
    logging.error("Error removing directory: %s - %s.", e.filename, e.strerror)

t_env.connect(connectorFileSystem() \   .with_format(format.path(result_path)) \
      .with_schemaformat(OldCsv(schema) \
      .infield_append_mode() \

  .register_table_sink("Results")

t = t_env.scan("Source")

my_udf = TrimFunction()

t_env.register_function('trim', my_udf)

my_udtf = SplitTableFunction()

t_env.register_function('split', my_udtf)

my_udaf = CountAggregateFunction()

t_env.register_function('my_count', my_udaf)

t.join_lateral("split(a) as (word)").select("trim(word) as word")\

delimiter(',')
    .field("word", DataTypes.STRING())
    .field("count", DataTypes.BIGINT())) \
    .with_schema(Schema()
    .field("word", DataTypes.STRING())
    .field("count", DataTypes.BIGINT())) \
    .register_table_sink("Results")

elements = [(word, 1) for word in content.split(" ")]
t_env.from_elements(elements, ["word", "count"]) \
       .group_by("word") \
    .select("word, my_ count(word1) as count") \
      .insert_into("Results")

t_env.execute()

with open(tmp_csv, 'r') as f:

  lines = f.read()

"word_count")

...   print (lines)

Job Submission

Flink Run

Support for submitting Python Table API job in CliFrontend,And using `flink run` submit Python Table API job. The current `flink` command command line syntax is as follows:

...

The initialization procedure will import all the Flink Python Table API classes which should be exposed to the user.

Example:

...

>>>t>>>exec_config env = ExecutionEnvironment.get_execution_environment()

>>>exec_env TableConfig.builder().as_streaming_execution().set_parallelism(1).build

>>>t_config = TableConfig()

>>>t_env = TableEnvironmentBatchTableEnvironment.create(t_config)

>>>data = [(1L, "Sunny"), (2L, "Eden")]

>>>source = t_env.from_collection(data, "id, name")

>>>source.select("id, name").insertInto(“print”)

>>>t_env.execute()

1,Sunny

2,Eden

...