Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This FLIP would add explicit temporary function support by renaming a few variable names, and potential deprecating some APIs in favor of new APIs that reflects their nature of dealing with temporary functions.

Note:

The following work is out of scope of this FLIP. They will taken care of by other teams/committers.

...

However, they need to be done simultaneously to achieve the functionality in full stack.

Proposed Changes

1. Support Precise Function Reference

Because temporary and built-in functions don’t have namespaces, a precise function reference in Flink must be catalog functions.

Thus, we can just try to get the function from the specified catalog (use current catalog if it’s a partially qualified name) and database. Also, SQL planner/parser needs corresponding changes to accommodate to such requirement.

 Support Two Types of Temporary Functions

We aim at supporting two types of temporary functions:

  • temporary system function that has no namespace and overrides built-in functions
  • temporary function that has catalog and database namespaces and overrides catalog functions.


FunctionCatalog’s APIs need to change accordingly to accommodate registering temporary functions with/without namespaces optionally.

a) Temporary System Functions

We will have new APIS in FunctionCatalog as “registerTemporarySystemScalar/Table/AggregateFunctions(String, UserDefinedFunction)” and functions will be stored in “Map<String, FunctionDefinition> tempSystemFunctions” in FunctionCatalog.

Their DDLs are “CREATE/DROP/ALTER TEMPORARY SYSTEM FUNCTION”.

b) Temporary Functions

We will add a new member variable to FunctionCatalog as “Map<ObjectIdentifier, UserDefinedFunction>  tempFunctions“ to hold those temporary functions in a central place, and new APIs “registerTemporaryScalar/Table/AggregateFunction(ObjectIdentifier, UserDefinedFunction)”. 

These functions have to reside in an available catalog and database - their lifespans are tied to catalog and database. 

  • Upon registration of such functions, we need to first confirm that the catalog and database exist
  • Upon lookup, we also need to first confirm the assigned catalog and database exist. If so, return the function; otherwise, we drop all the temporary functions in either the assigned catalog or the database. Such lazy dropping approach is necessary because these temporary functions don’t live in their assigned catalogs, and they are not aware of when databases or catalogs are dropped. (FYI, we had a discussion of whether catalogs should support temporary functions within themselves in which case such unawareness problem won’t occur, but decided to not do so at the moment) 

Their DDLs are “CREATE/DROP/ALTER TEMPORARY SYSTEM FUNCTION”.

Lifespan of both types of temporary functions will be within a session, and will destroyed upon session end.

Note: corresponding DDL are not part of this FLIP

2. Support Precise Function Reference

Because built-in functions don’t have namespaces, a precise function reference in Flink must be either temporary functions with namespaces or catalog functions.

The resolution order will be 

  1. Temporary functions with no namespace
  2. Catalog functions

32. Support Ambiguous Function Reference with a Redefined Resolution Order

For ambiguous function reference, there are 3 4 types of functions to consider: temporary functions with and without no namespaces, Flink built-in functions, and catalog functions.

The resolution order for ambiguous function reference only matters when there are functions of different types but the same name, e.g. when there’re three functions all named “myfunc” in temp, catalog, built-in functions. If there’s no function name collision, functions will just be resolved to whichever type it belongs to.

New order:

  1. Temporary system functions
  2. Flink Built-in functions
  3. Temporary functions, in the current catalog and current database of the session
  4. Catalog functions, in the current catalog and current database of the session

Temporary Temp functions should rank at the first above their corresponding persistent/built-in functions due to its temporary nature - users want to overwrite built-in or persistent functions with something temporary that is only visible to themselves and the session, and not impacting other users. In contrary, 1) if users don’t have the intention of overwriting other functions, they can just name the temporary functions to something else, considering the manipulation cost is so low for temporary objects, and 2) if built-in functions precede temporary functions, there’s no way to reference temp functions anymore

Flink built-in functions should precede catalog functions, because 1)  it always give a deterministic resolution order on ambiguous reference by invoking the built-in functions 2) catalog functions can always be precisely referenced with fully/partially qualified names. In contrary, if catalog functions precede built-in functions, built-in functions can never be referenced.

34. Changes to ObjectIdentifier

...

  1. Neither catalog nor database names exist. The call will be routed to ambiguous functions reference logic 
  2. Both catalog and database names exist, or only database name exist but not catalog name. The call will be routed to precise function reference logiclo

Otherwise, it will throw IllegalArgumentException

...

5. Changes to FunctionCatalog class

FunctionCatalog implements FunctionLookup

Note that FunctionLookup.Result requires an ObjectIndentifier with non-null catalog and db names, we probably will need to redefine that to accommondate built-in functions which don’t have catalog & db names, so we set catalog and db names as empty for built-in functions for now


Code Block
languagejava
// renamed from “userFunction”
private final Map<String, FunctionDefintion> tempSystemFunctions = new LinkedHashMap<>();
private final Map<StringMap<ObjectIdentifier, FunctionDefintion> tempFunctions = new LinkedHashMap<>();

public Optional<FunctionLookup.Result>void lookupFunctionregisterTemporarySystemScalarFunction(ObjectIdentifierString oi) {
	if (oi.getCatalogName().isPresent() && oi.getDatabaseName().isPresent()) {

		// lookup in just catalog functions
	}

	if (!oi.getCatalogName().isPresent() && !oi.getDatabaseName().isPresent()) {
		resolveAmbiguousFunctionReference(oi.getObjectName())
	}

	throw new IllegalArgumentException()/TableException();
}

private Optional<FunctionLookup.Result> resolveAmbiguousFunctionReference(String name);

	String normalizedName = normalizeName(name);

	// resolve to temp functions

	if (tempFunctions.containsKey(normalizedName)) {

		return Optional.of(
			new FunctionLookup.Result(
				ObjectIdentifier.of(
					Optional.empty(), // temp functions don’t have namespace
					Optional.empty(),
					name),
			tempFunctions.get(normalizedName)));
		}

	// resolve to Flink built-in functions
	Optional<FunctionDefinition> flinkBuiltIn = BuiltInFunctionDefinitions.getDefinitions()
		.stream()
		.filter(f -> normalizedName.equals(normalizeName(f.getName())))
		.findFirst()
		.map(Function.identity());

	if (flinkBuiltIn.isPresent()) {
		return flinkBuiltIn.map(definition -> new FunctionLookup.Result(
			ObjectIdentifier.of(
			Optional.empty(), // builtin functions don’t have namespace
			Optional.empty(),
			name),
		definition));
	}

	Catalog catalog = catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get();

	// resolve to catalog functions in the current catalog and current database
	ObjectPath path = new ObjectPath(catalogManager.getCurrentDatabase(), normalizedName);

	try {
		FunctionDefinition fd = createFunctionDefinition(normalizedName, catalog.getFunction(path));
		return Optional.of(
			new FunctionLookup.Result(
				ObjectIdentifier.of(
				catalogManager.getCurrentCatalog(),
				catalogManager.getCurrentDatabase(),
				name),
			fd)
		);
	} catch (FunctionNotExistException e) {
		// ignore
	}

	return Optional.empty();
}

private FunctionDefinition createFunctionDefinition(String name, CatalogFunction catalogFunction) {
	Catalog catalog = catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get();

	if (catalog.getTableFactory().isPresent() && catalog.getTableFactory().get() instanceof FunctionDefinitionFactory) {
		FunctionDefinitionFactory factory = (FunctionDefinitionFactory) catalog.getTableFactory().get();
		return factory.createFunctionDefinition(name, catalogFunction);
	} else {
		// go thru function definition discovery service
		// out of scope of this FLIP
		throw new UnsupportedOperationException();
	}

}name, ScalarFunction function) {
// put into tempSystemFunctions
}

public void registerTemporarySystemTableFunction(String name, TableFunction function) {
// put into tempSystemFunctions

}

public void registerTemporarySystemAggregateFunction(String name, AggregateFunction function) {
// put into tempSystemFunctions

}

public void registerTemporaryScalarFunction(ObjectIdentifier oi, ScalarFunction function) {
// put into tempFunctions
}

public void registerTemporarySystemTableFunction(ObjectIdentifier oi, TableFunction function) {
// put into tempFunctions
}

public void registerTemporarySystemAggregateFunction(ObjectIdentifier oi, AggregateFunction function) {
// put into tempFunctions
}

public Optional<FunctionLookup.Result> lookupFunction(ObjectIdentifier oi) {
	if ((oi.getCatalogName().isPresent() && oi.getDatabaseName().isPresent())
|| (!oi.getCatalogName().isPresent() && oi.getDatabaseName().isPresent())) {
		// resolvePreciseFunctionReference(oi);
}

if (!oi.getCatalogName().isPresent() && !oi.getDatabaseName().isPresent()) {
		resolveAmbiguousFunctionReference(oi.getObjectName());
}

throw new IllegalArgumentException()/TableException();
}

private Optional<FunctionLookup.Result> resolvePreciseFunctionReference(ObjectIdentifier oi) {
	// resolve order:
	// 1. Temporary functions
	// 2. Catalog functions
}


private Optional<FunctionLookup.Result> resolveAmbiguousFunctionReference(String name);
	// resolve order:
	// 1. Temporary system functions
	// 2. Builtin functions
	// 3. Temporary functions, in the current catalog/db
	// 2. Catalog functions, in the current catalog/db
}

Limitations:

We will not consider the following cases in this FLIP, and will leave them to future efforts:

  • By SQL standard, function resolution takes both function name and method parameters into consideration. To simplify our design, we will not consider resolving overloaded functions - those with the same name but different params - in this FLIP.
    • it's still valid to have a single function with overloaded eval() methods
  • Built-in functions of external systems cannot be referenced directly or across catalogs in SQL layer now, but can be referenced thru FunctionCatalog API

What’s next

  • We may support referencing Flink built-in functions and built-in functions of external systems directly in Flink SQL
  • We may provide a whilelist/blacklist mechanism to filter certain built-in functions

References

Some databases products disallow registering functions with the same name as that of a built-in function 

...