Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagesql
titlesyntax
CREATE TABLE ctas_hudi
 (
 	id BIGINT,
 	name STRING,
 	age INT
 )
 WITH ('connector.type' = 'hudi');

INSERT INTO ctas_hudi SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;

Table

...

Providing method that are used to execute CTAS for Table API user.

@PublicEvolving
public interface
TableEnvironment
 Table extends Explainable<Table>, Executable {

    /**
*
Registers
Declare the 
given
pipeline 
{@link Table}'s result as a catalog table with
defined by the given {@link 
TableDescriptor
Table}
's
 
options.
object
*

 to create the table 
*
at 
<p>
the 
CTAS for Table API
specified path.
*/
* <p>Examples:
*
* <pre>{
CreatingTable saveAs(String tablePath);
}

CreatingTable

Add CreatingTable to use CTAS for table API.

@code
/** A table that need to be created for CTAS. */
@PublicEvolving
public interface CreatingTable {


/**
Map<String,
 
String>
 
options
 
=
 
new HashMap<String, String>();
* add table option.
*
options.put
 such as option("connector
.type
", "
hudi
filesystem");
*
tEnv.createTable("MyTable", options, tEnv.sqlQuery("select id, name from user_table"))
/
CreatingTable option(String key, String value);

* }</pre>
/**
*

 Create the table under 
*
the 
@param
specified path
The
 
path
 
under
 
which
 
the
* 
table
and 
will
write 
be
the 
registered.
pipeline 
See
data 
also
to 
the
this 
{@link * @param options Table options.
table.
*/
TableEnvironment} class description for the format of the path.
TablePipeline create();

/**
* Create the table under the specified path if not exist

*
@param query The {@link Table} object describing
and write the pipeline 
for
data to 
further
this 
transformationscreateTable(String path, Map<String, String> options, Table query);
table.
*/
TablePipeline
void
createIfNotExist();
}

Catalog

Providing method that are used to create stage table.

@PublicEvolving
public interface Catalog {

   /**
    * 
Creates a new stage table for
When the CTAS
.
* and will be created when the Job status changes to Created;
* will be dropped when the job status changes to FAILED or CANCELED.
*
 function 
* This table will be recorded in the Catalog's memory
was used, the Catalog combined {@link Catalog#supportsManagedTable}

* 
<p>The
to 
framework will make sure
infer whether to 
call
add 
this
some 
method
options 
with
to 
fully validated
the {@link

* ResolvedCatalogTable
 CatalogBaseTable}.
Those
 
instances
 
are
 
easy to serialize
*
    * 
for a durable catalog implementation.
For example:
*

 {@link JdbcCatalog} can add 
*
user, 
@param
password, 
tablePath
url 
path
and 
of
other 
the table
options to 
be created
{@link CatalogBaseTable}.
*/
@param
 
table
 
the
 
table
default 
definition
void inferTableOptions(CatalogBaseTable 
* @param ignoreIfExists flag to specify behavior when a table already exists at the
* given path: if set to false, it throws a TableAlreadyExistException, if set to true, do
table) {}

   /**
* Assist in the serialization of different catalogs.
* Catalog decides for itself which information needs to be serialized.
*/
default
*
void serialize(OutputStream output) throws IOException 
nothing.
{}

/**
@throws
 
TableAlreadyExistException
 
if
 
table
* 
already
Assist 
exists
in 
and
deserialization 
ignoreIfExists
of 
is
different 
false
catalogs.
*
*
Determine 
@throws
whether 
DatabaseNotExistException
to 
if
create 
the
a 
database
new 
in
Catalog 
tablePath
instance 
doesn't
based 
exist
on
* the
*
deserialized 
@throws
data 
CatalogException
and 
in
the 
case
implementation of 
any runtime exception
the catalog.
*/
default
void
Catalog 
createStageTable
deserialize(
ObjectPath
InputStream 
tablePath,
input) 
CatalogBaseTable
throws 
table
ClassNotFoundException, 
boolean
IOException 
ignoreIfExists)
{
return this;
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException;
}
}


Proposed Changes

First of all, make it clear, CTAS command create table must go through catalog.

...

public class StreamGraph implements Pipeline {

... ...

    /** Registers the JobStatusHook. */
void setJobStatusHookaddJobStatusHook(JobStatusHook hook) {
...
}
}

...