Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion thread

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-57-Rework-FunctionCatalog-td32291.html#a32613

JIRA

...


Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14090

Release1.10


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

This FLIP would add explicit temporary function support by renaming a few variable names, and potential deprecating renaming some APIs in favor of new APIs that reflects their nature of dealing with temporary functions.

Note:

The following work is out of scope of this FLIP. They will taken care of by other teams/committers.

...

However, they need to be done simultaneously to achieve the functionality in full stack.

Proposed Changes

We will first clarify the dimensions of functions.

1. Support Precise Function Reference


System (can be used interchangeably with "builtin")Catalog
Non-Temporarysystem functionscatalog functions
Temporarytemporary system functionstemporary catalog functions


1. Support Two Types of Temporary Functions

We aim at supporting two types of temporary functions:

  • temporary system function that has no namespace and overrides built-in functions
  • temporary catalog function that has catalog and database namespaces and overrides catalog functions.

FunctionCatalog’s APIs need to change accordingly to accommodate registering temporary functions with/without namespaces optionally.

a) Temporary System Functions

We will have new APIS in FunctionCatalog as “registerTemporarySystemScalar/Table/AggregateFunctions(String, UserDefinedFunction)” and functions will be stored in “Map<String, FunctionDefinition> tempSystemFunctions” in FunctionCatalog.

Their DDLs are “CREATE/DROP TEMPORARY SYSTEM FUNCTION”.

They will be renamed from “registerScalar/Table/AggregateFunctions()”.

b) Temporary Catalog Functions

We will add a new member variable to FunctionCatalog as “Map<ObjectIdentifier, UserDefinedFunction>  tempCatalogFunctions“ to hold those temporary catalog functions in a central place, and new APIs “registerTemporaryScalar/Table/AggregateFunction(ObjectIdentifier, UserDefinedFunction)”. 

Lifespan of temp functions are not tied to those of catalogs and databases. Users can create temp catalog functions even though catalogs/dbs in their fully qualified names don't even exist.

Their DDLs are “CREATE/DROP TEMPORARY FUNCTION”.

Some other proposed SQL commands are:

"SHOW FUNCTIONS" - list names of temp and non-temp system/built-in functions, and names of temp and catalog functions in the current catalog and db

"SHOW ALL FUNCTIONS" - list names of temp and non-temp system/built functions, and fully qualified names of temp catalog functions and catalog functions in all catalogs and dbs

"SHOW ALL TEMPORARY FUNCTIONS" - list fully qualified names of temp catalog functions in all catalog and db 

"SHOW ALL TEMPORARY SYSTEM FUNCTIONS" - list names of all temp system functions


Lifespan of both types of temporary functions will be within a session, and will destroyed upon session end.

Note: corresponding DDL and SQL commands are not part of this FLIP

2. Support Precise Function Reference

Because system Because temporary and built-in functions don’t have namespaces, a precise function reference in Flink must be either temporary catalog functions or catalog functions.

Thus, we can just try to get the function from the specified catalog (use current catalog if it’s a partially qualified name) and database. Also, SQL planner/parser needs corresponding changes to accommodate to such requirement.

The resolution order will be 

  1. Temporary catalog functions
  2. Catalog functions

32. Support Ambiguous Function Reference with a Redefined Resolution Order

For ambiguous function reference, there are 3 4 types of functions to consider: temporary functions with and without no namespaces, Flink built-in system functions, and catalog functions.

The resolution order for ambiguous function reference only matters when there are functions of different types but the same name, e.g. when there’re three functions all named “myfunc” in temp, catalog, built-in functions. If there’s no function name collision, functions will just be resolved to whichever type it belongs to.

New order:

  1. Temporary system functions
  2. System functions Flink Built-in functions
  3. Temporary catalog functions, in the current catalog and current database of the session
  4. Catalog functions, in the current catalog and current database of the session

The new resolution order will be a breaking change, compared to existing resolution order.

Temp functions should rank above their corresponding persistent/built-in functions Temporary functions should rank at the first due to its temporary nature - users want to overwrite built-in or persistent functions with something temporary that is only visible to themselves and the session, and not impacting other users. In contrary, 1) if users don’t have the intention of overwriting other functions, they can just name the temporary functions to something else, considering the manipulation cost is so low for temporary objects, and 2) if built-in functions precede temporary functions, there’s no way to reference temp functions anymore

Flink built-in System functions should precede catalog functions, because 1)  it always give a deterministic resolution order on ambiguous reference by invoking the built-in functions 2) catalog functions can always be precisely referenced with fully/partially qualified names. In contrary, if catalog functions precede built-in functions, built-in functions can never be referenced.

34. Code Changes to ObjectIdentifier

FunctionIdentifier


Code Block
languagejava
Class ObjectIdentifierFunctionIdentifier {
    // for temporary/non-temporary system function
	Optional<String> catalogName;

	Optional<String> databaseName;

	String objectName;
}

3. Changes to FunctionLookup interface

In order to adapt to the two cases, we propose updating FunctionLookup existing API to lookupFunction(ObjectIdentifier oi).

Calls from planner have to pass an ObjectIdentifier in which 

  1. Neither catalog nor database names exist. The call will be routed to ambiguous functions reference logic 
  2. Both catalog and database names exist. The call will be routed to precise function reference logic

Otherwise, it will throw IllegalArgumentException


    // for temporary/non-temporary catalog function
	ObjectIdentifier oi;

    Optional<ObjectIdentifier> getIdentifier() {}
    Optional<String> getName() {}

    Optional<FunctionIdentifier> of(ObjectIdentifier oi) {}
    Optional<FunctionIdentifier> of(String name) {}
    
}


Changes to CallExpression and UnresolvedCallExpression

We should replace ObjectIdentifier with FunctionIdentifier in CallExpression and UnresolvedCallExpression. This would be a breaking change for these public evolving classes.

Code Block
languagejava
public final class CallExpression implements Expression {
	private final @Nullable FunctionIdentifier functionIdentifier;

    ...

    public Optional<FunctionIdentifier> getFunctionIdentifier() {
         return Optional.ofNullable(functionIdentifier);
    }
}

public final class UnresolvedCallExpression implements Expression {
	private final @Nullable FunctionIdentifier functionIdentifier;

    ...

	public Optional<FunctionIdentifier> getFunctionIdentifier() {
         return Optional.ofNullable(functionIdentifier);
    }
}


Changes to FunctionLookup interface

In case of a partially qualified name (<db_name>.<function_name>) in SQL, planner/parser should recognize it with context be padding name of the current catalog of the session first to build an ObjectIdentifier.

...

Code Block
languagejava
/**
 * Resolves a function. ObjectIdentifer should either contain both catalog and database names, or neither of them.
 *
 * @param oi objectfi identifier of function
 * @return An optional result of FunctionLookup
 */
public Optional<FunctionLookup.Result> lookupFunction(ObjectIdentifierFunctionIdentifier oifi);


Changes to FunctionCatalog class

FunctionCatalog implements FunctionLookup

Note that FunctionLookup.Result requires an ObjectIndentifier with non-null catalog and db names, we probably will need to redefine that to accommondate built-in functions which don’t have catalog & db names, so we set catalog and db names as empty for built-in functions for now


Code Block
languagejava
// renamed from “userFunction”
private final Map<String, FunctionDefintion> tempSystemFunctions = new LinkedHashMap<>();
private final Map<StringMap<FunctionIdentifier, FunctionDefintion>FunctionDefinition> tempFunctions = new LinkedHashMap<>();

public Optional<FunctionLookup.Result>void lookupFunctionregisterTempSystemScalarFunction(ObjectIdentifierString oi) {
	if (oi.getCatalogName().isPresent() && oi.getDatabaseName().isPresent()) {

		// lookup in just catalog functions
	}

	if (!oi.getCatalogName().isPresent() && !oi.getDatabaseName().isPresent()) {
		resolveAmbiguousFunctionReference(oi.getObjectName())
	}

	throw new IllegalArgumentException()/TableException();
}

private Optional<FunctionLookup.Result> resolveAmbiguousFunctionReference(String name);

	String normalizedName = normalizeName(name);

	// resolve to temp functions

	if (tempFunctions.containsKey(normalizedName)) {

		return Optional.of(
			new FunctionLookup.Result(
				ObjectIdentifier.of(
					Optional.empty(), // temp functions don’t have namespace
					Optional.empty(),
					name),
			tempFunctions.get(normalizedName)));
		}

	// resolve to Flink built-in functions
	Optional<FunctionDefinition> flinkBuiltIn = BuiltInFunctionDefinitions.getDefinitions()
		.stream()
		.filter(f -> normalizedName.equals(normalizeName(f.getName())))
		.findFirst()
		.map(Function.identity());

	if (flinkBuiltIn.isPresent()) {
		return flinkBuiltIn.map(definition -> new FunctionLookup.Result(
			ObjectIdentifier.of(
			Optional.empty(), // builtin functions don’t have namespace
			Optional.empty(),
			name),
		definition));
	}

	Catalog catalog = catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get();

	// resolve to catalog functions in the current catalog and current database
	ObjectPath path = new ObjectPath(catalogManager.getCurrentDatabase(), normalizedName);

	try {
		FunctionDefinition fd = createFunctionDefinition(normalizedName, catalog.getFunction(path));
		return Optional.of(
			new FunctionLookup.Result(
				ObjectIdentifier.of(
				catalogManager.getCurrentCatalog(),
				catalogManager.getCurrentDatabase(),
				name),
			fd)
		);
	} catch (FunctionNotExistException e) {
		// ignore
	}

	return Optional.empty();
}

private FunctionDefinition createFunctionDefinition(String name, CatalogFunction catalogFunction) {
	Catalog catalog = catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get();

	if (catalog.getTableFactory().isPresent() && catalog.getTableFactory().get() instanceof FunctionDefinitionFactory) {
		FunctionDefinitionFactory factory = (FunctionDefinitionFactory) catalog.getTableFactory().get();
		return factory.createFunctionDefinition(name, catalogFunction);
	} else {
		// go thru function definition discovery service
		// out of scope of this FLIP
		throw new UnsupportedOperationException();
	}

}name, ScalarFunction function) {
// put into tempSystemFunctions
}

public void registerTempSystemTableFunction(String name, TableFunction function) {
// put into tempSystemFunctions

}

public void registerTempSystemAggregateFunction(String name, AggregateFunction function) {
// put into tempSystemFunctions

}

public void registerTempCatalogScalarFunction(ObjectIdentifier foi, ScalarFunction function) {
// put into tempFunctions
}

public void registerTempCatalogTableFunction(ObjectIdentifier fi, TableFunction function) {
// put into tempFunctions
}

public void registerTempCatalogAggregateFunction(ObjectIdentifier fi, AggregateFunction function) {
// put into tempFunctions
}

public void dropTemporarySystemFunction(String name) {}

public void dropTemporaryCatalogFunction(FunctionIdentifier fi) {}

public Optional<FunctionLookup.Result> lookupFunction(FunctionIdentifier fi) {
	if (fi.getObjectIdentifier().isPresent()) {
		// resolvePreciseFunctionReference(fi.getObjectIdentifier());
	} else {
		resolveAmbiguousFunctionReference(fi.getName());
	}
}

private Optional<FunctionLookup.Result> resolvePreciseFunctionReference(FunctionIdentifier fi) {
	// resolve order:
	// 1. Temporary functions
	// 2. Catalog functions
}

private Optional<FunctionLookup.Result> resolveAmbiguousFunctionReference(String name);
	// resolve order:
	// 1. Temporary system functions
	// 2. Builtin functions
	// 3. Temporary functions, in the current catalog/db
	// 2. Catalog functions, in the current catalog/db
}

Limitations:

We will not consider the following cases in this FLIP, and will leave them to future efforts:

  • By SQL standard, function resolution takes both function name and method parameters into consideration. To simplify our design, we will not consider resolving overloaded functions - those with the same name but different params - in this FLIP.
    • it's still valid to have a single function with overloaded eval() methods
  • Built-in functions of external systems cannot be referenced directly or across catalogs in SQL layer now, but can be referenced thru FunctionCatalog API

What’s next

  • We may support referencing Flink built-in functions and built-in functions of external systems directly in Flink SQL
  • We may provide a whilelist/blacklist mechanism to filter certain built-in functions

References

Some databases products disallow registering functions with the same name as that of a built-in function 

...