...
Page properties |
---|
...
|
...
|
...
|
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
class TableConfig(object): def def __init__(self, j_table_config=None): gateway = get_gateway() if self._ j_table_config = j_table_config class Builder(object): is None: self._ def __init__(self, j_table_config _builder):= gateway.jvm.TableConfig() else: self self._j_table_config _builder = j_table_config_builder def set def get_timelocal_zonetimezone(self, time_zone): self return self._j_table_config_builder.setTimeZone(time_zone) return self def set_null_check(self, null_check): self._j_table_config_builder.setNullCheck(null_check) return self … … # We don't want to expose StreamExecutionEnvironment def as_streaming_execution(self): self._j_table_config_builder.asStreamingExecution() return self # We don't want to expose ExecutionEnvironment def as_batch_execution(self): self._j_table_config_builder.asBatchExecution() return self def build(self): .getLocalTimeZone().getId()
... def get_configuration(self): @classmethod def builder(cls def add_configuration(self, configuration): |
TableEnvironment
class TableEnvironment(object): """ Wrap and extend for org.apache.flink.table.api.TableEnvironment """ table_config = None def __init__(self, j_tenv): self._j_tenv = j_tenv def register_table(self, name, table): self._j_tenv.registerTable(name, table._j_table) def register_table_source(self, name, table_source): self._j_tenv.registerTableSource(name, table_source.j_table_source) def register_table_sink(self, name, table_sink): self._j_tenv.registerTableSink(name, table_sink.j_table_sink) def scan(self, *table_path): j_paths = TypesUtil.convert_py_list_to_j_array("java.lang.String", table_path) j_table = self._j_tenv.scan(j_paths) return Table(j_table) def connect(self, connector_descriptor): return TableDescriptor(self._j_tenv.connect(connector_descriptor._j_connector_descriptor)) … … def sql_query(self, query): j_table = self._j_tenv.sqlQuery(query) return Table(j_table) def sql_update(self, stmt, config=None): if config is not None: j_table = self._j_tenv.sqlUpdate(stmt, config) else: j_table = self._j_tenv.sqlUpdate(stmt) # Extension methods def from_collection(self, data): … return Table(...) def execute(self): self._j_tenv.execEnv().execute() def set_parallelism(self, parallelism): self._j_tenv.execEnv().setParallelism(parallelism) … ... @classmethod def create(cls, table_config): j_tenv = ... return TableEnvironment(j_tenv) |
...
└── pyflink.zip
└── ...
The flink-python -table module will be packaged as pyflink.zip, And put it in to opt/python/lib directory with PY4J_LICENSE.txt py4j-xxx-src.zip.
...