Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Proposing a public interface CreateOrReplaceTable used by CTAS(CREATE TABLE AS SELECT) for table API user.

/**
* AThe table is toused helpfor the {@code Create Table As Select} syntax in Table API implement the level. This interface
* is created by {@code Table#save} method, and provides some methods such as {@code option} and {@code create}
* method help to fill the table options and produce {@link TablePipeline}. Currently we only support
* Create Table As Select(CTAS) feature.
* The Create or Replace Table and Replace Table feature, but t
he {@code Replace Table AS SELECT} and {@code CreateOrReplace Table AS SELECT}
* syntax may be supported in the future,
* so it is named CreateOrReplaceTable.

*/
@PublicEvolving
public interface CreateOrReplaceTable {

/**
* Adding table options to CreateOrReplaceTable.
*
* <p>Example:
*
* <pre>{@code
* tab.option("connector", "filesystem");
* }</pre>
*/
CreateOrReplaceTable option(String key, String value);

/**
* Create the table in the specified path and write the pipeline data to this table.
*
* <p>Example:
*
* <pre>{@code
* table.saveAs("my_ctas_table")
* .option("connector", "filesystem")
* .option("format", "testcsv")
* .option("path", "/tmp/my_ctas_table/")
* .create();
* }</pre>
*/
TablePipeline create();

/**
* Create the table under the specified path if not exist and write the pipeline data to this table.
*
* <p>Example:
*
* <pre>{@code
* table.saveAs("my_ctas_table")
* .option("connector", "filesystem")
* .option("format", "testcsv")
* .option("path", "/tmp/my_ctas_table/")
* .createIfNotExist();
* }</pre>
*/
TablePipeline createIfNotExist();
}

...

Providing method that are used to serialize/deserialize Catalog and infer the option options of CatalogBaseTable, these options are used to compile the sql to JobGraph successfully.

@PublicEvolving
public interface Catalog {

    /**
* When using the Create Table As Select(CTAS) feature,
* planner will use this API to infer the default options for Table to avoidcompile failurethe tosql generate TableSinksuccessfully.
*
* For example:
* {@link JdbcCatalog} can add connector = 'jdbc', user, password, url and other options to {@link CatalogBaseTable},
* {@link HiveCatalog} can add connector = 'hive' to {@link CatalogBaseTable}, help generate TableSink.
* The tables in {@link GenericInMemoryCatalog} already exist externally,
* options must be filled in manually by the user, and the Catalog cannot be automatically inferred.
*
* <p>Example:
* <pre>{@code
* // If the user does not set any Table's options,
* // then the implementation of JdbcCatalog#inferTableOptions
* // can be like this to avoid the execution failure.
* public void inferTableOptions(ObjectPath tablePath, CatalogBaseTable table) {
* Map<String, String> tableOptions = table.getOptions();
* tableOptions.put("connector", "jdbc");
* tableOptions.put("url", getBaseUrl());
* tableOptions.put("table-name", tablePath.getObjectName());
* tableOptions.put("username", getUsername());
* tableOptions.put("password", getPassword());
* }
* }</pre>
*/
default void inferTableOptions(ObjectPath tablePath, CatalogBaseTable table) {
throw new UnsupportedOperationException();
}

}

...

/**
* Hooks provided by users on job status changing.
*/
@Internal
public interface JobStatusHook extends Serializable {

/** When Job become CREATED status. It would only be called one time. */
default void onCreated(JobID jobId) {};

/** When job finished successfully. */
default void onFinished(JobID jobId) {};

/** When job failed finally. */
default void onFailed(JobID jobId, Throwable throwable) {};

/** When job get canceled by users. */
default void onCanceled(JobID jobId) {};
}

Flink's current Hook design cannot meet the needs of CTAS. For example, the JobListener is on the Client side; JobStatusListener is on the JM side, but it cannot be serialized. Thus we tend to propose a new interface JobStatusHook, which could be attached to the JobGraph and executed in the JobMaster. The interface will also be marked as Internal. 

...