Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Background and Motivation

When discussing how to support Hive built-in functions in the thread of “FLIP-57 Rework FunctionCatalog” (http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-57-Rework-FunctionCatalog-td32291.html), a module approach was raised.

...

Plugins define a set of metadata, including functions, user defined types, operators, rules, etc. Prebuilt plugins will be added and provided, or users may choose write their own. Flink will take metadata from plugins as extensions of its core built-in system that users can take advantages of. For example, users can define their own geo functions and geo data types and plug them into Flink table as built-in objects. Another example is users can use an out-of-shelf Hive plugin to use Hive built-in functions as part of Flink built-in functions.

Background - How Presto Supports Plugins


In Presto, users can write their own plugins by implementing the Plugin interface. In order for Presto to pick up the desired plugins at runtime, users have to drop all the plugin jars into a designated directory in Presto installation.

Presto support plugins via SPI. Class name of each plugin is provided to Presto via the standard Java ServiceLoader interface: the classpath contains a resource file named org.presto.spi.Plugin in the META-INF/services directory for discovery. 

Proposal

Scope

In this FLIP we’ll design and develop a generic mechanism for modular plugins in Flink table core, with a focus on built-in functions.

...

  • HivePlugin, supporting Hive built-in functions and numerous Hive versions

Overall Design

All modules will implement the module interface. The module interface defines a set of APIs to provide metadata such as functions, user defined types, operators, rules, etc. Each module can choose to provide all or only a subset of the metadata. All modules are managed by a moduleManager, and all pluggable metadata are loaded on demand in object lookup.

...

Users have to be fully aware of the consequences of resetting modules as that might cause that some objects can not be referenced anymore or resolution order of some objects changes. E.g. “CAST” and “AS” cannot be overriden in CoreModule and users should be fully aware of that.

How to Load Plugins

To load plugins, users have to make sure relevant classes are already in classpath.

...

Besides, users may want to define different resolution orders for different metadata, e.g. “xxx, yyy” for functions, but “yyy, xxx” for data types. They will not be taken in this FLIP too. We can tackle that problem incrementally when there’s a real need from users.

Classes

The following is a generic design with functions as a specific example.

Image Modified

Module Interface

Module interface defines a set of metadata that a module can provide to Flink. It provides default implementations for all the APIs thus an implementation can implement only what it’s able to supply.

interface Module {
   default Set<String> listFunctions() { return Collections.EMPTY_SET };

   default Optional<FunctionDefinition> getFunctionDefinition(String name) { return Optional.empty() };
   // user defined types, operators, rules, etc

}

ModuleFactory interface

ModuleFactory defines a factory that is used for descriptors to uniquely identify a module in service discovery, and create an instance of the module.

interface ModuleFactory extends TableFactory {
   Plugin createModule(String name, Map<String, String> properties);
}

CoreModule and CoreModuleFactory

CoreModule is a pre-defined singleton module that should contain all built-in metadata of Flink core.

...

class CoreModuleFactory {
  @Override
  public Map<String, String> requiredContext() {
    Map<String, String> context = new HashMap<>();
    context.put("type", "core");
    return context;
  }

  @Override
  public List<String> supportedProperties() {
    return Collections.EMPTY_LIST;
  }

  @Override
  public Module createModule(String name, Map<String, String> properties) {
    return CoreModule.INSTANCE;
  }
}


ModuleManager

ModuleManager is responsible for loading all the Plugins, managing their life cycles, and resolve module objects.

public class ModuleManager {
  private List<Module> modules;

  public ModuleManager() {
    this.modules = new ArrayList<>(CoreModule.INSTANCE);
  }

  public void loadModule(String name, Module module) { ... }

  public void unloadModule(String name) { ... }

  public Set<Set<String>> listFunctions() {
    return plugins.stream()
        .map(p -> p.listFunctions())
        .collect(Collectors.toSet());
  }

  public Optional<FunctionDefinition> getFunctionDefinition(String name) {
    Optional<Plugin> p = plugins.stream()
        .filter(p -> p.listFunctions().contains(name))
        .findFirst();

    return p.isPresent() ? p.get().getFunctionDefinition(name) : Optional.empty();
  }


// addUserDefinedTypes(), getUserDefinedTypes(), etc
}


FunctionCatalog

FunctionCatalog will hold PluginManager to resolve built-in functions.

...

There was some proposals of merging FunctionCatalog with CatalogManager. It will not be considered in this FLIP.

How to Write and Use a Self-Defined Module - Using HiveModule as an Example

To support numerous Hive versions, we will use the shim approach, which is similar to that of existing HiveCatalog. Letting users explicitly specifying Hive versions is necessary since there are differences in Flink-Hive data conversions among different Hive versions.

...

modules: 

  - type: core

     name: core

  - type: hive

     name: hive

     hive-version: 2.2.1


Limitations and Future Work


As mention above, though this FLIP provides a generic design and mechanism for all plugin object types we want to support, we will only implement functions. Other objects can be added incrementally later on.

Reference


...