...
Let's take a word count as an example to provide a complete example. The Python Table API will look like the following (wordCountword_count.py):
… from pyflink... import TrimFunction, CountAggregateFunction, SplitTableFunction tmp_dir = tempfile.gettempdir() source_path = tmp_dir + '/streaming.csv' if os.path.isfile(source_path): os.remove(source_path) with open(source_path, 'w') as f: lines = 'hi guys\n' + 'hello guys\n' f.write(lines) print (lines) f.close() t_config = TableConfig.Builder().as_streaming_execution().set_parallelism(1).build() t_env = TableEnvironment.get_table_environment(t_config) field_names = ["a"] field_types = [DataTypes.STRING] # register Source table in table environment t_env.register_table_source( "Source", CsvTableSource(source_path, field_names, field_types)) # register Results table in table environment # using connector API schema = Schema() \ .field("a", DataTypes.STRING) \ .field("b", DataTypes.INT) format = Csv() \ .field_delimiter(",") \ .field("a", DataTypes.STRING) \ .field("b", DataTypes.INT) dataset import ExecutionEnvironment content = "..." t_config = TableConfig() # register Results table in table environment os.remove(tmp_csv) connector = FileSystem() \ .path(tmp_csvresult_path): t_env.connect(connectorFileSystem() \ .with_format(format.path(result_path)) \ .register_table_sink("Results") t = t_env.scan("Source") my_udf = TrimFunction() t_env.register_function('trim', my_udf) my_udtf = SplitTableFunction() t_env.register_function('split', my_udtf) my_udaf = CountAggregateFunction() t_env.register_function('my_count', my_udaf) t.join_lateral("split(a) as (word)").select("trim(word) as word")\ delimiter(',') elements = [(word, 1) for word in content.split(" ")] t_env.execute() with open(tmp_csv, 'r') as f: lines = f.read() "word_count") ... print (lines) |
Job Submission
Flink Run
Support for submitting Python Table API job in CliFrontend,And using `flink run` submit Python Table API job. The current `flink` command command line syntax is as follows:
...
The initialization procedure will import all the Flink Python Table API classes which should be exposed to the user.
Example:
… ... >>>t>>>exec_config env = ExecutionEnvironment.get_execution_environment() >>>exec_env TableConfig.builder().as_streaming_execution().set_parallelism(1).build >>>t_config = TableConfig() >>>t_env = TableEnvironmentBatchTableEnvironment.create(t_config) >>>data = [(1L, "Sunny"), (2L, "Eden")] >>>source = t_env.from_collection(data, "id, name") >>>source.select("id, name").insertInto(“print”) >>>t_env.execute() 1,Sunny 2,Eden |
...