...
Let's take a word count as an example to provide a complete example. The Python Table API will look like the following (word_count.py) :
… from pyflink.dataset import ExecutionEnvironment from pyflink.table import BatchTableEnvironment, TableConfig ... content = "..." t_config = TableConfig() env = ExecutionEnvironment.get_execution_environment() t_env = BatchTableEnvironment.create(env, t_config) # register Results table in table environment tmp_dir = tempfile.gettempdir() result_path = tmp_dir + '/result' if os.path.exists(result_path): try: if os.path.isfile(result_path): os.remove(result_path) else: shutil.rmtree(result_path) except OSError as e: logging.error("Error removing directory: %s - %s.", e.filename, e.strerror) t_env.connect(FileSystem().path(result_path)) \ .with_format(OldCsv() .field_delimiter(',') .field("word", DataTypes.STRING()) .field("count", DataTypes.BIGINT())) \ .with_schema(Schema() .field("word", DataTypes.STRING()) .field("count", DataTypes.BIGINT())) \ .register_table_sink("Results") elements = [(word, 1) for word in content.split(" ")] t_env.from_elements(elements, ["word", "count"]) \ .group_by("word") \ .select("word, count(1) as count") \ .insert_into("Results") t_env.execute("word_count") ... |
...