...
class TableConfig(object): def def __init__(self, j_table_config=None): self._j_table_config = j_table_config class Builder(object): gateway = get_gateway() if def __init__(self, j_table_config _builder)is None: self self._j_table_config _builder = j_table_config_builder def set_time_zone(self, time_zone): = gateway.jvm.TableConfig() else: self self._j_table_config _builder.setTimeZone(time_zone) return self def set_null_check(self, null_check): = j_table_config def get_local_timezone(self): return self self._j_table_config_builder.setNullCheckgetLocalTimeZone(null_check) return self … … # We don't want to expose StreamExecutionEnvironment def as_streaming_execution(self): self._j_table_config_builder.asStreamingExecution() return self # We don't want to expose ExecutionEnvironment def as_batch_execution(self): self._j_table_config_builder.asBatchExecution() return self def build(self): ).getId()
... def get_configuration(self): @classmethod def builder(cls def add_configuration(self, configuration): |
TableEnvironment
class TableEnvironment(object): """ Wrap and extend for org.apache.flink.table.api.TableEnvironment """ table_config = None def __init__(self, j_tenv): self._j_tenv = j_tenv def register_table(self, name, table): self._j_tenv.registerTable(name, table._j_table) def register_table_source(self, name, table_source): self._j_tenv.registerTableSource(name, table_source.j_table_source) def register_table_sink(self, name, table_sink): self._j_tenv.registerTableSink(name, table_sink.j_table_sink) def scan(self, *table_path): j_paths = TypesUtil.convert_py_list_to_j_array("java.lang.String", table_path) j_table = self._j_tenv.scan(j_paths) return Table(j_table) def connect(self, connector_descriptor): return TableDescriptor(self._j_tenv.connect(connector_descriptor._j_connector_descriptor)) … … def sql_query(self, query): j_table = self._j_tenv.sqlQuery(query) return Table(j_table) def sql_update(self, stmt, config=None): if config is not None: j_table = self._j_tenv.sqlUpdate(stmt, config) else: j_table = self._j_tenv.sqlUpdate(stmt) # Extension methods def from_collection(self, data): … return Table(...) def execute(self): self._j_tenv.execEnv().execute() def set_parallelism(self, parallelism): self._j_tenv.execEnv().setParallelism(parallelism) … ... @classmethod def create(cls, table_config): j_tenv = ... return TableEnvironment(j_tenv) |
...
└── pyflink.zip
└── ...
The flink-python -table module will be packaged as pyflink.zip, And put it in to opt/python/lib directory with PY4J_LICENSE.txt py4j-xxx-src.zip.
...