...
Proposing a public interface CreateOrReplaceTable used by CTAS(CREATE TABLE AS SELECT) for table API user.
/** * AThe table is toused helpfor the {@code Create Table As Select} syntax in Table API implement the level. This interface * is created by {@code Table#save} method, and provides some methods such as {@code option} and {@code create} * method help to fill the table options and produce {@link TablePipeline}. Currently we only support * Create Table As Select(CTAS) feature. * The Create or Replace Table and Replace Table feature, but the {@code Replace Table AS SELECT} and {@code CreateOrReplace Table AS SELECT} * syntax may be supported in the future, * so it is named CreateOrReplaceTable. */ @PublicEvolving public interface CreateOrReplaceTable {
/** * Adding table options to CreateOrReplaceTable. * * <p>Example: * * <pre>{@code * tab.option("connector", "filesystem"); * }</pre> */ CreateOrReplaceTable option(String key, String value);
/** * Create the table in the specified path and write the pipeline data to this table. * * <p>Example: * * <pre>{@code * table.saveAs("my_ctas_table") * .option("connector", "filesystem") * .option("format", "testcsv") * .option("path", "/tmp/my_ctas_table/") * .create(); * }</pre> */ TablePipeline create();
/** * Create the table under the specified path if not exist and write the pipeline data to this table. * * <p>Example: * * <pre>{@code * table.saveAs("my_ctas_table") * .option("connector", "filesystem") * .option("format", "testcsv") * .option("path", "/tmp/my_ctas_table/") * .createIfNotExist(); * }</pre> */ TablePipeline createIfNotExist(); }
|
...
Providing method that are used to serialize/deserialize Catalog and infer the option options of CatalogBaseTable, these options are used to compile the sql to JobGraph successfully.
@PublicEvolving public interface Catalog {
/** * When using the Create Table As Select(CTAS) feature, * planner will use this API to infer the default options for Table to avoidcompile failurethe tosql generate TableSinksuccessfully. * * For example: * {@link JdbcCatalog} can add connector = 'jdbc', user, password, url and other options to {@link CatalogBaseTable}, * {@link HiveCatalog} can add connector = 'hive' to {@link CatalogBaseTable}, help generate TableSink. * The tables in {@link GenericInMemoryCatalog} already exist externally, * options must be filled in manually by the user, and the Catalog cannot be automatically inferred. * * <p>Example: * <pre>{@code * // If the user does not set any Table's options, * // then the implementation of JdbcCatalog#inferTableOptions * // can be like this to avoid the execution failure. * public void inferTableOptions(ObjectPath tablePath, CatalogBaseTable table) { * Map<String, String> tableOptions = table.getOptions(); * tableOptions.put("connector", "jdbc"); * tableOptions.put("url", getBaseUrl()); * tableOptions.put("table-name", tablePath.getObjectName()); * tableOptions.put("username", getUsername()); * tableOptions.put("password", getPassword()); * } * }</pre> */ default void inferTableOptions(ObjectPath tablePath, CatalogBaseTable table) { throw new UnsupportedOperationException(); } }
|
...
/** * Hooks provided by users on job status changing. */ @Internal public interface JobStatusHook extends Serializable {
/** When Job become CREATED status. It would only be called one time. */ default void onCreated(JobID jobId) {}; /** When job finished successfully. */ default void onFinished(JobID jobId) {};
/** When job failed finally. */ default void onFailed(JobID jobId, Throwable throwable) {};
/** When job get canceled by users. */ default void onCanceled(JobID jobId) {}; } |
Flink's current Hook design cannot meet the needs of CTAS. For example, the JobListener is on the Client side; JobStatusListener is on the JM side, but it cannot be serialized. Thus we tend to propose a new interface JobStatusHook, which could be attached to the JobGraph and executed in the JobMaster. The interface will also be marked as Internal.
...