Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

System functions are temporary and will only be persisted by name (and version) in the plan. Therefore, we can also support function instances there (not only classes). Also, temporary catalog functions will only be persisted by name object identifier in the plan and can support instances. No Java serialization required.

...

It should be possible for the user to change the internal UDF implementation (e.g. upgrade a library etc) as long as input/output parameters stay the same. 

Options

Persisted catalog functions are stored by object identifier and class. Similar options as to catalog tables apply:

...

We should store a version with every serialized RexNode. In order to keep the plan simple, we derive the function's version from the Flink version of the JSON plan by default if no function version has been defined. We extend ModuleManager to lookup FunctionDefinitions by name and version:version. If no version is defined, we take the most recent version.

ModuleManager.getFunctionDefinition(String name, @Nullable Integer version)

...

Note that the internal digest of an expression therefore might change between Flink versions after restoring from a JSON plan. However, the Web UI should be stable since we encode the operator description in the JSON plan.

Example

Let's assume we have the following (possibly overloaded) function definition:

TO_TIMESTAMP_LTZ(BIGINT | [STRING ',' STRING])

The JSON plan should look similar to:

{
  "kind" : "CALL",
  "function" : {
    "name" : "TO_TIMESTAMP_LTZ"
    "version": 1
  }
}

If the latest version is specified, the EXPLAIN plan will look similar to:

== Optimized Physical Plan ==
Calc(select=[TO_TIMESTAMP_LTZ(f0)])


Let's assume we change the runtime behavior of TO_TIMESTAMP_LTZ. The signature might remain unchanged in this case. We create a new function definition with the same name for the new behavior.

We keep the old function definition around under an internal name. The physical plan after restore might look similar to:

== Optimized Physical Plan ==
Calc(select=[$TO_TIMESTAMP_LTZ$1(f0)])

$TO_TIMESTAMP_LTZ$1 has been returned by the ModuleManager.

Topology Versioning

ExecNode

...

COMPILE PLAN OVERWRITE '/mydir/plan.json' INSERT ...