...
Page properties |
---|
...
|
...
|
...
|
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
class Table(object): """ Wrapper of org.apache.flink.table.api.Table """ def __init__(self, j_table): self._j_table = j_table # The references to table object on JVM. def select(self, col_list): return Table(self._j_table.select(col_list)) # Call the java table api method directly def where(self, condition): return Table(self._j_table.where(condition)) … … def write_to_sink(self, j_sink): self._j_table.writeToSink(j_sink._j_table_sink) def to_pandas(self): … return pandas.DataFrame(..) … .... |
GroupedTable
...
class TableConfig(object): def def __init__(self, j_table_config=None): gateway = get_gateway() if self._j_table_config = j_table_config class Builder(object): is None: self._ def __init__(self, j_table_config _builder):= gateway.jvm.TableConfig() else: self self._j_table_config _builder = j_table_config_builder def set def get_timelocal_zonetimezone(self, time_zone): self return self._j_table_config_builder.setTimeZone(time_zone) return self def set_null_check(self, null_check): .getLocalTimeZone().getId()
... def get_configuration(self): return self … … # We don't want to expose StreamExecutionEnvironment def as_streaming_execution(self): )) def add_configuration(self, configuration): return self # We don't want to expose ExecutionEnvironment def as_batch_execution(self): self._j_table_config_builder.asBatchExecution() return self def build(self): return TableConfig(self._j_table_config_builder.build()) @classmethod def builder(cls): return cls.Builder(_jvm.org.apache.flink.table.api.TableConfig.builder()) |
TableEnvironment
TableEnvironment
class TableEnvironment(object): """ Wrap and extend for org.apache.flink.table.api.TableEnvironment """ table_config = None def __init__(self, j_tenv): self._j_tenv = j_tenv def register_table(self, name, table): self._j_tenv.registerTable(name, table._j_table) def register_table_source(self, name, table_source): self._j_tenv.registerTableSource(name, table_source.j_table_source) def register_table_sink(self, name, table_sink class TableEnvironment(object): """ Wrap and extend for org.apache.flink.table.api.TableEnvironment """ table_config = None def __init__(self, j_tenv): self._j_tenv = j_tenv def register_table(self, name, table): self._j_tenv.registerTableregisterTableSink(name, table_sink._j_table_sink) def register_table_source(scan(self, name, *table_sourcepath): self._j_tenv.registerTableSource(name, table_source.j_table_source) def register_table_sink(self, name, table_sink): j_paths = TypesUtil.convert_py_list_to_j_array("java.lang.String", table_path) j_table = self._j_tenv.scan(j_paths) return Table(j_table) def connect(self, connector_descriptor): return TableDescriptor(self self._j_tenv.registerTableSinkconnect(name, tableconnector_sinkdescriptor._j_tableconnector_sinkdescriptor)) … … def sql_query def scan(self, *table_path): j_paths = TypesUtil.convert_py_list_to_j_array("java.lang.String", table_path)query): j_table = self._j_tenv.scansqlQuery(j_pathsquery) return Table(j_table) def connectsql_update(self, connector_descriptorstmt, config=None): if config is not None: j_table = return TableDescriptor( self._j_tenv.connect(connector_descriptor._j_connector_descriptor)) … … def sql_query(self, query): sqlUpdate(stmt, config) else: j j_table = self._j_tenv.sqlQuerysqlUpdate(querystmt) return Table(j_table) # Extension methods def from_collection def sql_update(self, stmt, config=Nonedata): if config is not None: j_table = self._j_tenv.sqlUpdate(stmt, config) else: … return Table(...) def execute(self): self j_table = self._j_tenv.sqlUpdateexecEnv().execute(stmt) # Extension methods def fromset_collectionparallelism(self, dataparallelism): … return Table(...) def from_pandas(self, data, schema=None): … return Table(...) def execute(self): self self._j_tenv.execEnv().execute() def set_parallelism(self, parallelism): self._j_tenv.execEnv().setParallelism(parallelism) … ... @classmethod def create(cls, table_config): j_tenv = ... return TableEnvironment(j_tenv) |
...
└── pyflink.zip
└── ...
The flink-python -table module module will be packaged as pyflink.zip, And put it in to opt/python/lib directory with PY4J_LICENSE.txt py4j-xxx-src.zip.
...
Let's take a word count as an example to provide a complete example. The Python Table API will look like the following (wordCount.py) :
… from pyflink... import TrimFunction, CountAggregateFunction, SplitTableFunction tmp_dir = tempfile.gettempdir() source_path = tmp_dir + '/streaming.csv' if os.path.isfile(source_path): os.remove(source_path) with open(source_path, 'w') as f: lines = 'hi guys\n' + 'hello guys\n' f.write(lines) print (lines) f.close() t_config = TableConfig.Builder().as_streaming_execution().set_parallelism(1).build() t_env = TableEnvironment.get_table_environment(t_config) field_names = ["a"] field_types = [DataTypes.STRING] # register Source table in table environment t_env.register_table_source( "Source", CsvTableSource(source_path, field_names, field_types)) # register Results table in table environment # using connector API schema = Schema() \ .field("a", DataTypes.STRING) \ .field("b", DataTypes.INT) format = Csv() \ .field_delimiter(",") \ .field("a", DataTypes.STRING) \ .field("b", DataTypes.INT) dataset import ExecutionEnvironment content = "..." t_config = TableConfig() # register Results table in table environment os.remove(tmp_csv) connector = FileSystem() \ .path(tmp_csvresult_path): t_env.connect(connectorFileSystem() \ .with_format(format.path(result_path)) \ .register_table_sink("Results") t = t_env.scan("Source") my_udf = TrimFunction() t_env.register_function('trim', my_udf) my_udtf = SplitTableFunction() t_env.register_function('split', my_udtf) my_udaf = CountAggregateFunction() t_env.register_function('my_count', my_udaf) t.join_lateral("split(a) as (word)").select("trim(word) as word")\ delimiter(',') elements = [(word, 1) for word in content.split(" ")] t_env.execute() with open(tmp_csv, 'r') as f: lines = f.read() "word_count") ... print (lines) |
Job Submission
Flink Run
Support for submitting Python Table API job in CliFrontend,And using `flink run` submit Python Table API job. The current `flink` command command line syntax is as follows:
...
The initialization procedure will import all the Flink Python Table API classes which should be exposed to the user.
Example:
… ... >>>t>>>exec_config env = ExecutionEnvironment.get_execution_environment() >>>exec_env TableConfig.builder().as_streaming_execution().set_parallelism(1).build >>>t_config = TableConfig() >>>t_env = TableEnvironmentBatchTableEnvironment.create(t_config) >>>data = [(1L, "Sunny"), (2L, "Eden")] >>>source = t_env.from_collection(data, "id, name") >>>source.select("id, name").insertInto(“print”) >>>t_env.execute() 1,Sunny 2,Eden |
...