Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

Current state: "Under Discussion",

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-64-Support-for-Temporary-Objects-in-Table-module-td32684.html

JIRA:

...


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14485

Release1.11


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

In other words the TableSource & TableSink are to too physical to be exposed for the inline declaration. They are meant for predefined connectors. If a user wants to read from some inline source, it can be done with from/toDataStream.

...

As a result of the rework of the type system in UDFs, we will be able to merge the three methods into a single one.  Moreover for permanent functions we need users to register a class instead of an instance. To keep this method in sync with SQL DDL we should encourage users to use a class name for temporary functions as well.

We use the “create” prefix rather than “register” to be closer to SQL DDL.

Suggested methods:

void createTemporaryFunction(String path, Class<? extends UserDefinedFunction> functionClass);

There were concerns raised if we need a variant that registers instances. This FLIP does not make any assumptions if we should or not introduce that method. We will revisit that after FLIP-65

The method to be discussed:

void

...

createTemporaryFunction(String

...

path,

...

UserDefinedFunction

...

function);

ConnectTableDescriptor#registerTableSink,registerTableSource,registerTableSourceAndSink

...

The temporary objects can shadow permanent objects. Therefore it is vital to enable dropping them to switch from temporary (usually used for experimentationsexperimentation) to permanent objects. We suggest to introduce a separate methods for temporary objects to make the distinction really clear which objects are dropped. The dropTemporary* methods would remove only the temporary objects. They would not take permanent objects into consideration. The same should apply for the regular drop methods. They should only apply to permanent tables, but should throw an exception if a temporary object with same identifier exists. The methods would return true if an object existed under given path and was removed.

...

boolean dropTemporaryView(String path);
boolean dropTemporaryViewdropTemporaryTable(String path);
boolean dropTemporaryFunction(String path);boolean dropTemporaryFunction(String path);

Listing temporary objects

As discussed in FLIP-57 we need method for listing temporary functions to be able to list temporary system function. To have a consistent behavior for all temporary objects. I suggest introducing similar methods for all other temporary objects:

String[] listTemporaryTables();

String[] listTemporaryViews();

String[] listTemporaryFunctions();

The current methods such as listTables/listFunctions would not list any of the temporary objects, but only the persistent objects.

Summary:

Methods of TableEnvironment

Current call

Replacement

Comment

registerTable

createTemporaryView

For the non temporary part we need to make `QueryOperation` string serializable.

registerTableSink

(Deprecate) → to be removed


registerTableSource

(Deprecate) → to be removed


registerDataStream

createTemporaryView


registerScalarFunction/

registerAggergateFunction/

registerTableFunction

createTemporaryFunction

We can unify the 3 methods into one once we rework type inference for UDFs

...

Current call

Comment

dropTemporaryTable


dropTemporaryView


dropTemporaryFunction


listTemporaryTables
listTemporaryViews
listTemporaryFunctions


Methods of ConnectTableDescriptor

Current call

Replacement

SQL equivalent

Comment

registerTableSource

(Deprecate) → to be removed

-


registerTableSink

(Deprecate) → to be removed

-


registerTableSourceAndSink

createTemporaryTable

CREATE TEMPORARY TABLE

We should not support CREATE TEMPORARY TABLE AS SELECT

 = CREATE TEMPORARY VIEW

...

Call to add

SQL equivalent

Comment

createView(Table)

CREATE VIEW

We need to make `QueryOperation`s string serializable.

createTable(TableDescriptor)

CREATE TABLE

We need to rework TableDescriptor. We can temporary temporarily use ConnectTableDescriptor#connect

createFunction

CREATE FUNCTION

We need serializable function representation

...

  • TableEnvironment
    • Table from(String tablePath)
    • void insertInto(String sinkPath, Table table);
  • Table
  • void insertInto(String sinkPath)
    • we need to immediately drop the “void insertInto(String tablePath, String... tablePathContinued);” for this to work. Otherwise this call would be ambiguous:

      Table t = …
      t.insertInto(“db.sink”)

    • void insertInto(String sinkPath)

Parsing logic

Parsing logic should follow the SQL standard logic for identifiers

...

The tables & views are always identified with a 3 part path. Because it is not always the case for functions the function resolution is discussed separately.

If a user provides a not fully qualified identifier, it is first resolved to 3 part one and then the identifier is:

  1. first evaluated in the temporary map
  2. catalog & database

In case of functions as discussed in FLIP-57 they can have 1 or 3 part identifiers.

  • 1 part for built-in functions
  • 3 part for catalog functions

We suggest to always treat the temporary function in the 3 part category.

Therefore the resolution logic would be following:

  1. built-in functions (1-part name)
  2. temporary functions (3-part path, expanded if provided with less than 3 parts)
  3. catalog functions (3-part path, expanded if provided with less than 3 parts)

For the write path, all temporary objects would always be expanded to 3 part identifiers, with the current catalog & current database if needed.

We suggest to introduce an in-memory maps in CatalogManager:

The user provided path is always (both for registering & looking up an object) first expanded to a full 3-part path.

createTemporaryView("cat.db.temp", ...) → registers function with an identifier `cat`.`db`.`temp`
createTemporaryView("db.temp", ...) → registers function with an identifier `current_cat`.`db`.`temp`
createTemporaryView("temp", ...) → registers function with an identifier `current_cat`.`current_db`.`temp`

The same logic applies for looking up objects:
tEnv.from("cat.db.temp") → scans a view/table with an identifier `cat`.`db`.`temp`
tEnv.from("db.temp") → scans a view/table with an identifier `current_cat`.`db`.`temp`
tEnv.from("temp") → scans a view/table with an identifier `current_cat`.`current_db`.`temp`

The resolution order between temporary & persistent objects is as follows:

  1. Temporary tables/views
  2. Persistent Catalog tables/views

Temporary functions identifiers resolution:

Temporary function identifiers were discussed as part of FLIP-57. To summarize the outcome of the discussion, temporary functions can shadow both system and catalog functions. This implies following resolution order for the read path:

  1. Temporary system functions
  2. System functions
  3. Temporary catalog functions, in the current catalog and current database of the session
  4. Catalog functions, in the current catalog and current database of the session

For the write path, if a user registers a temporary function with just a name it will be registered as a temporary system function:

createTemporaryFunction("temp", Function.class) → registers function with an identifier `temp`

If a user uses either 3-part path or 2-part path it is registered as a catalog temporary function, and possibly expanded with the current catalog:

createTemporaryFunction("cat.db.temp", Function.class) → registers function with an identifier `cat`.`db`.`temp`
createTemporaryFunction("db.temp", Function.class) → registers function with an identifier `current_cat`.`db`.`temp`

Temporary objects should be stored in memory in a CatalogManager/FunctionCatalogprivate final Map<ObjectIdentifier, CatalogTable> temporaryTables;
private final Map<ObjectIdentifier, CatalogFunction> temporaryFunctions;

Rejected alternatives:

  1. 1-part path
    1. no other system has such semantics, all systems assign temporary tables & views to some schema (either with the same rules as regular objects or special temporary schema)
  2. Require special names for temporary objects, e.g. (#name as in SQL Server, or PTT_nam as in ORACLE)
  3. Register temporary objects in a special DB (as in SQL Server, Oracle, Postgres)
  4. Always assign temporary functions to some namespace (see FLIP-57).

Compatibility, Deprecation, and Migration Plan

  • Methods of TableEnvironment to be deprecated:
    • void registerTable(String name, Table table)
    • void registerTableSource(String name, TableSource<?> tableSource);
    • void registerTableSink(String name, TableSink<?> configuredSink);
    • Table scan(String... tablePath)
    • void registerFunction(String name, ScalarFunction function)
    • <T> void registerFunction(String name, TableFunction<T> tableFunction)
    • <T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction)
    • <T, ACC> void registerFunction(String name, TableAggregateFunction<T, ACC> tableAggregateFunction)
    • <T> void registerDataStream(String name, DataStream<T> dataStream)
    • <T> void registerDataStream(String name, DataStream<T> dataStream, String fields)
    • <T> void registerDataSet(String name, DataStream<T> dataStream)
    • <T> void registerDataSet(String name, DataStream<T> dataStream, String fields)
  • Methods of ConnectTableDescriptor to be deprecated:
    • public void registerTableSource(String name) 
    • public void registerTableSink(String name) 
    • public void registerTableSourceAndSink(String name)
  • Methods of TableEnvironment to be dropped:
    • void insertInto(Table table, StreamQueryConfig queryConfig, String sinkPath, String... sinkPathContinued)
    • void insertInto(Table table, BatchQueryConfig queryConfig, String sinkPath, String... sinkPathContinued)
    • void insertInto(Table table, String sinkPath, String... sinkPathContinued)
  • Methods of Table to be dropped
    • void insertInto(QueryConfig conf, String tablePath, String... tablePathContinued)
    • void insertInto(String tablePath, String... tablePathContinued)

Implementation plan

The implementation of changes described for functions of this FLIP has to be postponed after type inference is exposed for UserDefinedFunctions.

References:

How other systems handle temporary objects:

...

Postgres allows overriding permanent tables with temporary objects, but does not allow arbitrary schemas. Objects identifiers consist of only a single object name.

https://www.postgresql.org/docs/9.3/sql-createtable.html

Compatibility, Deprecation, and Migration Plan

  • We deprecate all methods mentioned above and remove them in the next release after the one when we deprecated them

Test Plan

Describe in a few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.