Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Page properties


Discussion threadhttps://www.mail-archive.com/dev@flink.apache.org/msg29894.html
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14132

Release1.10


Google Dochttps://docs.google.com/document/d/17CPMpMbPDjvM4selUVEfh_tqUK_oV0TODAUA9dfHakc/edit?usp=sharing

Status

Current state: Under Discussion

Discussion thread

JIRA:

Released:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Background and Motivation

When discussing how to support Hive built-in functions in the thread of “FLIP-57 Rework FunctionCatalog” (http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-57-Rework-FunctionCatalog-td32291.html), a module approach was raised.

...

There are two aspects of the motivation:

  1. Enpower users to write code and do customized developement for Flink table core
  2. Enable users to integrate Flink with cores and built-in objects of other systems, so users can reuse whatever they are familiar with in other SQL systems seamlessly as core and built-ins of Flink SQL and Table 
  3. Enpower users to write code and do customized developement for Flink table core

Modules Plugins define a set of metadata, including functions, user defined types, operators, rules, etc. Prebuilt plugins modules will be added and provided, or users may choose write their own. Flink will take metadata from plugins modules as extensions of its core built-in system that users can take advantages of. For example, users can define their own geo functions and geo data types and plug them into Flink table as built-in objects. Another example is users can use an out-of-shelf Hive plugin module to use Hive built-in functions as part of Flink built-in functions.

Background - How Presto Supports Plugins


In Presto, users can write their own plugins by implementing the Plugin interface. In order for Presto to pick up the desired plugins at runtime, users have to drop all the plugin jars into a designated directory in Presto installation.

Presto support plugins via SPI. Class name of each plugin is provided to Presto via the standard Java ServiceLoader interface: the classpath contains a resource file named org.presto.spi.Plugin in the META-INF/services directory for discovery. 

Proposal

Scope

In this FLIP we’ll design and develop a generic mechanism for modular plugins in pluggable modules in Flink table core, with a focus on built-in functions.

We’ll specifically create two plugin implementations module implementations in this FLIP

  • CorePluginCoreModule, with existing Flink built-in functions only
  • HivePluginHiveModule, supporting Hive built-in functions and numerous Hive versions

Overall Design

All plugins modules will implement the Plugin module interface. The Plugin module interface defines a set of APIs to provide metadata such as functions, user defined types, operators, rules, etc. Each plugin module can choose to provide all or only a subset of the metadata. All plugins modules are managed by a PluginManagermoduleManager, and all pluggable metadata are loaded on demand in object lookup.

Flink’s existing core metadata will also be a plugin module named as “CorePlugin”“CoreModule”. Since we want to focus on supporting functions thru Pluginsmodules,we’ll only migrate Flink’s existing built-in functions into the CorePlugin CoreModule at this moment as the first step.

All plugin module metadata will be seen as a part of Flink table core, and won’t have namespaces.

Objects in modules are loaded on demand instead of eagerly, so there won't be inconsistency.

Users have to be fully aware of the consequences of resetting modules as that might cause that some objects can not be referenced anymore or resolution order of some objects changes. E.g. “CAST” and “AS” cannot be overriden in CoreModule and users should be fully aware of that.

How to Load

...

Modules

To load pluginsmodules, users have to make sure relevant classes are already in classpath.

Java/Scala:

// new APIs to TableEnvironment
interface TableEnvironment {
    // load a module instance to the end of the module list
    void loadModule(String name, Module m);

    // unload a module instance from module list and other modules remain the same relative positions
    void unloadModule(String name); // the type string defined in the module

    // list all the modules' names according to order in module list
    List<String> listModules();
}

// note the following

plugins

modules will be of the order they are specified
tableEnv.

usePlugins(Core.INSTANCE, MyPlugin.INSTANCE

loadModule("a", MyModule.INSTANCE);
tableEnv.loadModule("b", new Xxx(properties));
tableEnv.loadModule("c", new Yyy());
tableEnv.unloadModule("a");

Yaml file:

Plugins

modules: # note the following

plugins

modules will be of the order they are specified
  - name: core
 

-

 

type

 type: core
   -

type: myplugin

name: a
     type: mymodule
   -

type

 name: b
     type: xxx
   

  property1

 property1: value1
   

  property2

 property2: value2
   -

type

name: c
     type: yyy

Since it doesn’t make sense to load the same plugin multiple times, plugins don’t need to have names.


Based on the plugins module type defined in yaml file, SQL CLI will invoke factory service to search the factory class that provides the given plugin module name, and then set them in TableEnvironment.

If users don’t set plugins, the CorePlugin will be used by default.

All plugin metadata are case-insensitive.

...

A few clarifications

  • By default, yaml file doesn’t have the “modules” section in effect, and core module will be loaded by default.
  • If users specify the “modules” section in yaml file, modules will be strictly loaded according to that, if CoreModule is not specified there, it won’t be loaded.

In case users forgot to specify core module, “modules” section will be commented out in yaml file as following 

#modules: # note the following modules will be of the order they are specified
#  - name: core
#     type: core

SQL:

  • SHOW MODULES: show module names in the existing module list in order
  • LOAD MODULE 'name' [WITH (‘type’=’xxx’, 'prop'='myProp', ...)] : load a module with given name and append to end of the module list
  • UNLOAD MODULE 'name’ : unload a module by name from module list and other modules remain the same relative positions


NOTE: the SQL syntax has been discussed again and received some modifications, see FLINK-21045 and the discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLINK-21045-Support-load-module-and-unload-module-SQL-syntax-td48398.html


Resolution Order

Object will be resolved to plugins modules in the order they are defined either in program or in yaml configs. When there are objects sharing the same name, resolution logic will go thru plugins modules in order and return whatever the first one is found, the other ones sitting in the back in the order will be ignored. E.g. if plugins modules are set as “xxx, yyy” where xxx and yyy modules both have a function named “f”, then “f” will always be resolved as that in xxx module.

...

Besides, users may want to define different resolution orders for different metadata, e.g. “xxx, yyy” for functions, but “yyy, xxx” for data types. They will not be taken in this FLIP too. We can tackle that problem incrementally when there’s a real need from users.

Classes

...

The following is a generic design with functions as a specific example.

Image Added

...

Module Interface

Plugin Module interface defines a set of metadata that a plugin module can provide to Flink. It provides default implementations for all the APIs thus an implementation can implement only what it’s able to supply.

interface

Plugin

Module {
   default Set<String> listFunctions() { return Collections.EMPTY_SET };

   default Optional<FunctionDefinition> getFunctionDefinition(String name) { return Optional.empty() };
   // user defined types, operators, rules, etc

}

...

ModuleFactory interface

PluginFactory ModuleFactory defines a factory that is used for descriptors to uniquely identify a Plugin module in service discovery, and create an instance of the pluginmodule.

interface

PluginFactory

ModuleFactory extends TableFactory {
 

 Plugin createPlugin

 Module createModule(Map<String, String> properties);
}

...

CoreModule and

...

CoreModuleFactory

CorePlugin CoreModule is a pre-defined singleton plugin module that contains should contain all built-in metadata of Flink core.

We currently only move built-in functions into CorePluginCoreModule.

public class

CorePlugin

CoreModule implements

Plugin

Module {
  public static final Core INSTANCE = new Core();

  private

CorePlugin

CoreModule() { }

  @Override
  public Set<String> listFunctions() {
    return BuiltInFunctionDefinitions.getDefinitions().stream()
        .map(f -> f.getName())

        .collect(Collectors.toSet());
  }

  @Override
  public Optional<BuiltInFunctionDefinition> getFunctionDefinition(String name) {
    return BuiltInFunctionDefinitions.getDefinitions().stream()
          .filter(f -> f.getName().

equalsIgnoreCase

equals(name))
          .findFirst();
  }
}


class

CorePluginFactory

CoreModuleFactory {
  @Override
  public Map<String, String> requiredContext() {
    Map<String, String> context = new HashMap<>();
    context.put("type", "core");
    return context;
  }

  @Override
  public List<String> supportedProperties() {
    return Collections.EMPTY_LIST;
  }

  @Override
  public

Plugin createPlugin

Module createModule(Map<String, String> properties) {
    return

CorePlugin

CoreModule.INSTANCE;
  }
}

...


ModuleManager

PluginManager ModuleManager is responsible for loading all the Pluginsmodules, managing their life cycles, and resolve plugin module objects.

public class

PluginManager

ModuleManager {
  private

List<Plugin> plugins

LinkedHashMap<String, Module> modules;

  public

PluginManager

ModuleManager() {
    this.

plugins

modules = new

ArrayList<>(CorePlugin

LinkedHashMap<>();

    modules.put("core", CoreModule.INSTANCE);
  }

  

  public void

setPlugins(List<Plugin> plugins

loadModule(String name, Module module) { ... }

 

  this.plugins = plugins;
 

public void unloadModule(String name) { ... }

  public

Set<Set<String>>

Set<String> listFunctions() {
    return

plugins

modules.stream()
        .map(p -> p.listFunctions())

        .flatmap(e → e.stream())
        .collect(Collectors.toSet());
  }

  public Optional<FunctionDefinition> getFunctionDefinition(String name) {
   

Optional<Plugin>

Optional<Module> p =

plugins

modules.stream()
        .filter(p -> p.listFunctions().contains(name))
        .findFirst();

    return p.isPresent() ? p.get().getFunctionDefinition(name) : Optional.empty();
  }


// addUserDefinedTypes(), getUserDefinedTypes(), etc
}


FunctionCatalog

FunctionCatalog will hold PluginManager ModuleManager to resolve built-in functions.

class FunctionCatalog implements FunctionLookup {
 

PluginManager pluginManager

ModuleManager moduleManager;
  CatalogManager catalogManager;

  public Optional<FunctionLookup.Result> lookupFunction(String name) {

    // search built-in functions in

PluginManager

ModuleManager, rather than BuiltInFunctionsDefinitions

    // Resolution order depends on FLIP-57: Rework FunctionCatalog

  }

}


There was some proposals of merging FunctionCatalog with CatalogManager. It will not be considered in this FLIP.

How to Write and Use a Self-Defined

...

Module - Using

...

HiveModule as an Example

To support numerous Hive versions, we will use the shim approach, which is similar to that of existing HiveCatalog. Letting users explicitly specifying Hive versions is necessary since there are differences in Flink-Hive data conversions among different Hive versions.

public class

HivePlugin

HiveModule implements

Plugin

Module {

  private

static final Logger LOGGER = LoggerFactory.getLogger(HivePlugin.class)

final String hiveVersion;
  private

final String hiveVersion;  private

final HiveShim hiveShim;

  public

HivePlugin

HiveModule(String hiveVersion) {
    this.hiveVersion = hiveVersion;

     this

    this.hiveShim = HiveShimLoader.load(hiveVersion);
  }

  @Override
  public Set<String> listFunctions() {
 

 

   ...
  }  

  @Override
  public Optional<FunctionDefinition> getFunctionDefinition(String name) {
      ...
  }
}



public abstract class

HivePluginFactory

HiveModuleFactory implements

PluginFactory

ModuleFactory {

  @Override
  public

Plugin createPlugin

Module createModule(Map<String, String> properties) {
    return new

HivePlugin

HiveModule(properties.get("hive-version"));
  }

  @Override
  public Map<String, String> requiredContext() {
    return new HashMap<String, String>() {{
      put("type", "hive");
    }};
  }

  @Override
  public List<String> supportedProperties() {
    return Arrays.asList("hive-version");
  }
}


Java/Scala:

tableEnv.

usePlugins(CorePlugin.INSTANCE, new HivePlugin

loadModule("hive", new HiveModule("2

_

.2

_

.1"));


Yaml file:

plugins

modules

  - type: core

     name: core

  - type: hive

     name: hive

     hive-version: 2.2.1


Limitations and Future Work


As mention above, though this FLIP provides a generic design and mechanism for all plugin object module object types we want to support, we will only implement functions. Other objects can be added incrementally later on.

Reference


Document

...