...
In order to guarantee atomicity, there will be differences in implementation details.
Image Added
Steps:
- Create the sink table in the catalog based on the schema of the query result.
- Start the job and write the result to target.
- If the job executes successfully, then make data visible.
- If the job execution fails, then drop the sink table or delete data.(This capability requires runtime module support, such as hook, and SQL passes relevant parameters to the runtime module.)
Streaming
Since streaming job are long-running, the table needs to be created first.
...
We will refer to spark DataSource v1 implementation.
Image Removed
Steps:
...
.
...
Drop the table if the job fails requires some additional support(both Streaming and Batch):
...
- FileSystem Sink: Data should be written to the temporary directory first, visible after the final job is successful.
- Supports transactions or two-phase commit Sink: Data visible after the final job is successful.
- Other Sink: Data is visible immediately after writing.
Public API Changes
Table Environment
Providing method that are used to execute CTAS for Table API user.
@PublicEvolving
public interface TableEnvironment {
...
...
Catalog
We can think that there are two types of catalogs in Flink, in-memory catalogs and external catalogs:
In-memory catalog:
- Metadata is a copy of the metadata of the external system, and the user ensures that the entity exists in the external system and the metadata is consistency, otherwise, throw exception when running. CTAS need create table first, so it is hard to ensures that the entity exists in the external system and the metadata is consistency.
- The user needs to configure the parameters of the external system through the with syntax, and Flink cannot obtain it through the in-memory directory.
Such as kafka table, we need the user to tell us the address of the kafka server, the name of the topic, and the data serialization format, otherwise flink job will failed.
External catalog:
- Metadata directly refers to external systems, and there is no consistency problem. Create table also directly calls the external system, so it is naturally guaranteed that the entity exists in the external system.
- The with syntax parameter is optional, Flink can obtain the necessary parameters through the external catalog.
...
Both in-memory catalog and external catalog will support CTAS, if the CTAS command is executed in the in-memory catalog and the target store does not exist in the external system, the Flink job will fail, which is consistent with the current flink behavior.
In-memory Catalog,the options of the table are completely dependent on user input,
we Catalog ,we should check the table's options, avoid users not setting configuration parameters.
Public API Changes
Table Environment
Providing method that are used to execute create table for CTAS to check table's optionsCTAS for Table API user.
@PublicEvolving public interface |
Catalog/* Registers the given {@link Table}'s |
*Createsnew for CTAS with {@link TableDescriptor}'s options. * * <p> |
Theframeworkwillmakesuretocallthis@linkmethod with fully validated {CatalogTable} Those instances are easy to serialize for a durable catalog implementation.<p>Examples: * * <pre>{@code * tEnv.createTable("MyTable", TableDescriptor.forConnector("hive") * |
@paramtablePath path of the table to be created @param table the table definitiondropIfExistsflagtospecifybehaviorwhenatablealreadyexistsatgivenpath:ifsettofalse,itthrowsaTableAlreadyExistException, if set to true, * drop the table first, then createformat of the path. * @param |
checkTableOptionsflagtospecifybehaviorto check @paramtable'soptions@throwsTableAlreadyExistExceptioniftablealreadyexistsand ignoreIfExists is false * @throws DatabaseNotExistException if the database in tablePath doesn't existdescribing the pipeline for further transformations. * |
@throws CatalogException in case of any runtime exception
*ObjectPathtablePathCatalogTabletableboolean dropIfExists,
boolean checkTableOptions) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException
Compatibility, Deprecation, and Migration Plan
...