You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 11 Next »

Status

Current state: Under Discussion

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-57-Rework-FunctionCatalog-td32291.html#a32613

JIRA: FLINK-14090

Released: 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivations and Background

In Flink 1.9, we integrated Flink’s FunctionCatalog with CatalogManager from the brand-new catalog API. It enabled Flink to resolve function reference to catalog functions and to support using rich Hive user defined functions (of types UDF, GenericUDF, GenericUDTF, UDAF, GenericUDAFResolver) in Flink SQL. The feature has been a critical integration for Flink to embrace the rich ecosystem of Hive, and attract more users to Flink batch.

Right now there are mainly 3 types of functions in Flink:

  1. Temporary function: These are functions that are registered via FunctionCatalog#registerScalarFunction() / registerTableFunction() / registeraAggregateFunction(), or created via upcoming function DDL as “CREATE TEMP FUNCTION”. They are held in-memory in FunctionCatalog, last only for the lifespan of a session
  2. Built-in functions:These are system functions in BuiltInFunctionDefinitions
  3. Catalog functions in the current catalog and current database of the session


Despite that big advancement, there are still a few problems existing in how Flink table handles functions.

1. Does not clarify the concept of temporary functions

By definition, functions that are created as temporary functions will only be visible to the current session, and will only last till the end of the session.

FunctionCatalog right now holds a map of FunctionDefinitions named ‘userFunctions’ in its memory, which are registered from Table APIs (FunctionCatalog#registerScalarFunction()/registerTableFunction()/registerAggregateFunction()). These are actually temporary functions, though we haven’t officially categorize them as temporary functions explicitly in both the variable name and the API names yet. Not making the nature of these functions clear confuses users, it also doesn’t distinguish them from catalog functions well.

2. Does not clarify the concept of built-in functions

Built-in functions are system information. Similar to temp functions, they don’t have namespaces and should not belong to any catalog or databases.

However, in Flink 1.9, built-in functions are regarded as belongings of Flink’s default catalog and default database. Note that Flink’s default catalog (a.k.a Flink’s “built-in” catalog) is not a system catalog, it’s backed by a GenericInMemoryCatalog impl for the main purpose of backward compatbility and it is no different from any persistent catalog by design except its storage is memory. We should be extra careful on deciding what belongs to the default catalog, instead of throwing everything into it.

3. Flink doesn’t support referencing functions with fully/partialy qualified name

There should be two ways to reference functions in SQL:

  1. Via ambiguous identifier reference, a.k.a. only with function name

E.g. “select * from myfunc(a) from mytable;” in Flink

  1. Via unambiguous/precise identifier reference, a.k.a, with a completely/partially qualified name, as ‘<catalog_name>.<db_name>.<function_name>’ or ‘<db_name>.<function_name>’.

E.g. “select * from mycatalog.mydb.myfunc(a) from mytable;” or “select * from mydb.myfunc(a) from mytable;” in Flink

Flink currently only support ambiguous identifier reference. The limitation means that if users want to use a function in a non-current catalog and/or db, they have to set that catalog and db as current catalog and current db.

4. Function resolve order

When there are functions of different types but of the same name, resolving the function name happens in the current order:

  1. Catalog function - functions from catalog APIs
  2. userFunctions - functions held in-memory in FunctionCatalog
  3. Flink built-in functions

This order is problematic in a few ways:

  1. It’s not deterministic. Depending on the catalog and/or user function names registered, the resolution order for ambugious function reference can be vary. E.g. in a catalog/db where a catalog function registered the same name as a built-in function, the catalog function will be called; in a catalog/db where it doesn’t, the built-in function will be called
  2. When a catalog or user function has the same name as a built-in function, there’s no way for users to reference the built-in function unless they switch to a catalog and db that doesn’t have such a catalog/user function name
  3. It doesn’t consider temp functions due to lack of that clear concept

We’d like to redefine that order.

Goals

1) Support precise functions reference

Add support for umambiguous function reference, with a completely/partially qualified name, as ‘<catalog_name>.<db_name>.<function_name>’ or ‘<db_name>.<function_name>’.

E.g. “select * from mycatalog.mydb.myfunc(a) from mytable;” or “select * from mydb.myfunc(a) from mytable;” in Flink

With this FLIP, users will be empowered to reference user defined catalog functions precisely and also across catalogs and databases in SQL thru the whole stack.

2) Remove namespaces from Flink built-in functions

We should remove namepaces of Flink built-in functions thru the stack of Flink SQL. Flink built-in functions do not belong to any catalog or database.

3) Redefine function resolution order for ambiguous function reference

We will redefine the ambiguos function resolution order, taking all new types of functions into consideration and making it as user-friendly, flexible, and deterministic as possible for users.

4) Support temporary functions explicitly

This FLIP would add explicit temporary function support by renaming a few variable names, and potential deprecating some APIs in favor of new APIs that reflects their nature of dealing with temporary functions.

Note:

The following work is out of scope of this FLIP. They will taken care of by other teams/committers.

  1. Corresponding DDL. E.g. “CREATE [TEMP] FUNCTION xxx AS xxx”
  2. Some corresponding changes in SQL planner/parser (we’ll do what we can do :)
  3. Reinitiate Flink functions after reading it class name from catalog

However, they need to be done simultaneously to achieve the functionality in full stack.

Proposed Changes

1. Support Two Types of Temporary Functions

We aim at supporting two types of temporary functions:

  • temporary system function that has no namespace and overrides built-in functions
  • temporary function that has catalog and database namespaces and overrides catalog functions.


FunctionCatalog’s APIs need to change accordingly to accommodate registering temporary functions with/without namespaces optionally.

a) Temporary System Functions

We will have new APIS in FunctionCatalog as “registerTemporarySystemScalar/Table/AggregateFunctions(String, UserDefinedFunction)” and functions will be stored in “Map<String, FunctionDefinition> tempSystemFunctions” in FunctionCatalog.

Their DDLs are “CREATE/DROP TEMPORARY SYSTEM FUNCTION”.

Existing “registerScalar/Table/AggregateFunctions()” will be deprecated in favor of the new APIs.

b) Temporary Functions

We will add a new member variable to FunctionCatalog as “Map<ObjectIdentifier, UserDefinedFunction>  tempFunctions“ to hold those temporary functions in a central place, and new APIs “registerTemporaryScalar/Table/AggregateFunction(ObjectIdentifier, UserDefinedFunction)”. 

These functions have to reside in an available catalog and database - their lifespans are tied to catalog and database. 

  • Upon registration of such functions, we need to first confirm that the catalog and database exist
  • Upon lookup, we also need to first confirm the assigned catalog and database exist. If so, return the function; otherwise, we drop all the temporary functions in either the assigned catalog or the database. Such lazy dropping approach is necessary because these temporary functions don’t live in their assigned catalogs, and they are not aware of when databases or catalogs are dropped. (FYI, we had a discussion of whether catalogs should support temporary functions within themselves in which case such unawareness problem won’t occur, but decided to not do so at the moment) 

Their DDLs are “CREATE/DROP TEMPORARY FUNCTION”.

Lifespan of both types of temporary functions will be within a session, and will destroyed upon session end.

Note: corresponding DDL are not part of this FLIP

2. Support Precise Function Reference

Because built-in functions don’t have namespaces, a precise function reference in Flink must be either temporary functions with namespaces or catalog functions.

The resolution order will be 

  1. Temporary functions with no namespace
  2. Catalog functions

3. Support Ambiguous Function Reference with a Redefined Resolution Order

For ambiguous function reference, there are 4 types of functions to consider: temporary functions with and without no namespaces, Flink built-in functions, and catalog functions.

The resolution order for ambiguous function reference only matters when there are functions of different types but the same name, e.g. when there’re three functions all named “myfunc” in temp, catalog, built-in functions. If there’s no function name collision, functions will just be resolved to whichever type it belongs to.

New order:

  1. Temporary system functions
  2. Flink Built-in functions
  3. Temporary functions, in the current catalog and current database of the session
  4. Catalog functions, in the current catalog and current database of the session

The new resolution order will be a breaking change, compared to existing resolution order.

Temp functions should rank above their corresponding persistent/built-in functions due to its temporary nature - users want to overwrite built-in or persistent functions with something temporary that is only visible to themselves and the session, and not impacting other users. In contrary, 1) if users don’t have the intention of overwriting other functions, they can just name the temporary functions to something else, considering the manipulation cost is so low for temporary objects, and 2) if built-in functions precede temporary functions, there’s no way to reference temp functions anymore

Flink built-in functions should precede catalog functions, because 1)  it always give a deterministic resolution order on ambiguous reference by invoking the built-in functions 2) catalog functions can always be precisely referenced with fully/partially qualified names. In contrary, if catalog functions precede built-in functions, built-in functions can never be referenced.

4. Changes to ObjectIdentifier


Class ObjectIdentifier {
	Optional<String> catalogName;

	Optional<String> databaseName;

	String objectName;
}


4. Changes to FunctionLookup interface

In order to adapt to the two cases, we propose updating FunctionLookup existing API to lookupFunction(ObjectIdentifier oi).

Calls from planner have to pass an ObjectIdentifier in which 

  1. Neither catalog nor database names exist. The call will be routed to ambiguous functions reference logic 
  2. Both catalog and database names exist, or only database name exist but not catalog name. The call will be routed to precise function reference lo

Otherwise, it will throw IllegalArgumentException

In case of a partially qualified name (<db_name>.<function_name>) in SQL, planner/parser should recognize it with context be padding name of the current catalog of the session first to build an ObjectIdentifier.


/**
 * Resolves a function. ObjectIdentifer should either contain both catalog and database names, or neither of them.
 *
 * @param oi object identifier of function
 * @return An optional result of FunctionLookup
 */
public Optional<FunctionLookup.Result> lookupFunction(ObjectIdentifier oi);


5. Changes to FunctionCatalog class


private final Map<String, FunctionDefintion> tempSystemFunctions = new LinkedHashMap<>();
private final Map<ObjectIdentifier, FunctionDefintion> tempFunctions = new LinkedHashMap<>();

public void registerTemporarySystemScalarFunction(String name, ScalarFunction function) {
// put into tempSystemFunctions
}

public void registerTemporarySystemTableFunction(String name, TableFunction function) {
// put into tempSystemFunctions

}

public void registerTemporarySystemAggregateFunction(String name, AggregateFunction function) {
// put into tempSystemFunctions

}

public void registerTemporaryScalarFunction(ObjectIdentifier oi, ScalarFunction function) {
// put into tempFunctions
}

public void registerTemporaryTableFunction(ObjectIdentifier oi, TableFunction function) {
// put into tempFunctions
}

public void registerTemporaryAggregateFunction(ObjectIdentifier oi, AggregateFunction function) {
// put into tempFunctions
}

public void dropTemporarySystemFunction(String name) {}

public void dropTemporaryFunction(ObjectIdentifier oi) {}

public Optional<FunctionLookup.Result> lookupFunction(ObjectIdentifier oi) {
	if ((oi.getCatalogName().isPresent() && oi.getDatabaseName().isPresent())
|| (!oi.getCatalogName().isPresent() && oi.getDatabaseName().isPresent())) {
		// resolvePreciseFunctionReference(oi);
}

if (!oi.getCatalogName().isPresent() && !oi.getDatabaseName().isPresent()) {
		resolveAmbiguousFunctionReference(oi.getObjectName());
}

throw new IllegalArgumentException()/TableException();
}

private Optional<FunctionLookup.Result> resolvePreciseFunctionReference(ObjectIdentifier oi) {
	// resolve order:
	// 1. Temporary functions
	// 2. Catalog functions
}


private Optional<FunctionLookup.Result> resolveAmbiguousFunctionReference(String name);
	// resolve order:
	// 1. Temporary system functions
	// 2. Builtin functions
	// 3. Temporary functions, in the current catalog/db
	// 2. Catalog functions, in the current catalog/db
}

Limitations:

We will not consider the following cases in this FLIP, and will leave them to future efforts:

  • By SQL standard, function resolution takes both function name and method parameters into consideration. To simplify our design, we will not consider resolving overloaded functions - those with the same name but different params - in this FLIP.
    • it's still valid to have a single function with overloaded eval() methods

References

Some databases products disallow registering functions with the same name as that of a built-in function 

Document

https://docs.google.com/document/d/1w3HZGj9kry4RsKVCduWp82HkW6hhgi2unnvOAUS72t8/edit?usp=sharing

  • No labels