Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Let's take a word count as an example to provide a complete example. The Python Table API will look like the following (word_count.py) :

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import BatchTableEnvironment, TableConfig
...

content = "..."

t_config = TableConfig()
env = ExecutionEnvironment.get_execution_environment()
t_env = BatchTableEnvironment.create(env, t_config)

# register Results table in table environment
tmp_dir = tempfile.gettempdir()
result_path = tmp_dir + '/result'
if os.path.exists(result_path):
try:
    if os.path.isfile(result_path):
        os.remove(result_path)
    else:
        shutil.rmtree(result_path)
except OSError as e:
    logging.error("Error removing directory: %s - %s.", e.filename, e.strerror)

t_env.connect(FileSystem().path(result_path)) \
    .with_format(OldCsv()
    .field_delimiter(',')
    .field("word", DataTypes.STRING())
    .field("count", DataTypes.BIGINT())) \
    .with_schema(Schema()
    .field("word", DataTypes.STRING())
    .field("count", DataTypes.BIGINT())) \
    .register_table_sink("Results")

elements = [(word, 1) for word in content.split(" ")]
t_env.from_elements(elements, ["word", "count"]) \
    .group_by("word") \
    .select("word, count(1) as count") \
    .insert_into("Results")

t_env.execute("word_count")

...

...