Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In order to guarantee atomicity, there will be differences in implementation details.

Image Added

Steps:

  1. Create the sink table  in the catalog based on the schema of the query result.
  2. Start the job and write the result to target.
  3. If the job executes successfully, then make data visible.
  4. If the job execution fails, then drop the sink table or delete data.(This capability requires runtime module support, such as hook, and SQL passes relevant parameters to the runtime module.)

Streaming

Since streaming job are long-running, the table needs to be created first.

...

We will refer to spark DataSource v1 implementation.

Image Removed

Steps:

...

.

...


Drop the table if the job fails requires some additional support(both Streaming and Batch):

...

  • FileSystem Sink: Data should be written to the temporary directory first, visible after the final job is successful.
  • Supports transactions or two-phase commit Sink:  Data visible after the final job is successful.
  • Other Sink:  Data is visible immediately after writing.

Public API Changes

Table Environment

Providing method that are used to execute CTAS for Table API user.

@PublicEvolving
public interface TableEnvironment {

...

  • .

...

Catalog

We can think that there are two types of catalogs in Flink, in-memory catalogs and external catalogs:

In-memory catalog:

  1. Metadata is a copy of the metadata of the external system, and the user ensures that the entity exists in the external system and the metadata is consistency, otherwise, throw exception when running. CTAS need create table first, so it is hard to ensures that the entity exists in the external system and the metadata is consistency.
  2. The user needs to configure the parameters of the external system through the with syntax, and Flink cannot obtain it through the in-memory directory.

Such as kafka table, we need the user to tell us the address of the kafka server, the name of the topic, and the data serialization format, otherwise flink job will failed.

External catalog:

  1. Metadata directly refers to external systems, and there is no consistency problem. Create table also directly calls the external system, so it is naturally guaranteed that the entity exists in the external system.
  2. The with syntax parameter is optional, Flink can obtain the necessary parameters through the external catalog.

...

Both in-memory catalog and external catalog will support CTAS, if the CTAS command is executed in the in-memory catalog and the target store does not exist in the external system, the Flink job will fail, which is consistent with the current flink behavior.

In-memory Catalog,the options of the table are completely dependent on user input, 


we Catalog ,we should check the table's options, avoid users not setting configuration parameters.

Public API Changes

Table Environment

Providing method that are used to execute create table for CTAS to check table's optionsCTAS for Table API user.

@PublicEvolving
public interface
Catalog
TableEnvironment {
    /**
/
*
*
 Registers the given {@link Table}'s 
*
result 
Creates
as a 
new
catalog table
for CTAS
 with {@link TableDescriptor}'s options.
*
* <p>
The
 
framework
CTAS 
will
for 
make
Table 
sure
API.
to
 
call
 
this@link
 
method with fully validated {
 *

*
CatalogTable} Those instances are easy to serialize for a durable catalog implementation.
<p>Examples:
*
* <pre>{@code
* tEnv.createTable("MyTable", TableDescriptor.forConnector("hive")
*
@param
 
tablePath path of the table to be created
 .build());
* }</pre>
*
@param table the table definition

* @param path
dropIfExists
The 
flag
path 
to
under 
specify
which 
behavior
the 
when
table 
a
will 
table
be 
already
registered. 
exists
See 
at
also the {@link
*
given
 
path:
 
if
 
set
 
to
TableEnvironment} 
false,
class 
it
description 
throws
for 
a
the 
TableAlreadyExistException, if set to true,
* drop the table first, then create
format of the path.
* @param
checkTableOptions
descriptor 
flag
Template 
to
for 
specify
creating 
behavior
a 
to check
{
@param
@link 
table
CatalogTable}
's
 
options
instance.
*
@throws
@param 
TableAlreadyExistException
query 
if
The 
table
{@link 
already
Table} 
exists
object 
and ignoreIfExists is false
* @throws DatabaseNotExistException if the database in tablePath doesn't exist
describing the pipeline for further transformations.
*
@throws CatalogException in case of any runtime exception
*
/
void createTable(
ObjectPath
String 
tablePath
path, 
CatalogTable
TableDescriptor 
table
descriptor, 
boolean dropIfExists,
boolean checkTableOptions)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException
Table query);
}


Compatibility, Deprecation, and Migration Plan

...