You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Status

Current state: Under Discussion

Discussion thread

JIRA:

Released:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Background and Motivation

When discussing how to support Hive built-in functions in the thread of “FLIP-57 Rework FunctionCatalog” (http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-57-Rework-FunctionCatalog-td32291.html), a module approach was raised.

As we discussed and looked deeper, we think it’s a good opportunity to broaden the design and the corresponding problem it aims to solve. The motivation now is to expand Flink’s core table system and enable users to do customizations by writing pluggable modules.

There are two aspects of the motivation:

  1. Enpower users to write code and do customized developement for Flink table core
  2. Enable users to integrate Flink with cores and built-in objects of other systems, so users can reuse whatever they are familiar with in other SQL systems seamlessly as core and built-ins of Flink SQL and Table 


Plugins define a set of metadata, including functions, user defined types, operators, rules, etc. Prebuilt plugins will be added and provided, or users may choose write their own. Flink will take metadata from plugins as extensions of its core built-in system that users can take advantages of. For example, users can define their own geo functions and geo data types and plug them into Flink table as built-in objects. Another example is users can use an out-of-shelf Hive plugin to use Hive built-in functions as part of Flink built-in functions.

Background - How Presto Supports Plugins


In Presto, users can write their own plugins by implementing the Plugin interface. In order for Presto to pick up the desired plugins at runtime, users have to drop all the plugin jars into a designated directory in Presto installation.

Presto support plugins via SPI. Class name of each plugin is provided to Presto via the standard Java ServiceLoader interface: the classpath contains a resource file named org.presto.spi.Plugin in the META-INF/services directory for discovery. 

Proposal

Scope

In this FLIP we’ll design and develop a generic mechanism for modular plugins in Flink table core, with a focus on built-in functions.

We’ll specifically create two plugin implementations in this FLIP

  • CorePlugin, with existing Flink built-in functions only
  • HivePlugin, supporting Hive built-in functions and numerous Hive versions

Overall Design

All plugins will implement the Plugin interface. The Plugin interface defines a set of APIs to provide metadata such as functions, user defined types, operators, rules, etc. Each plugin can choose to provide all or only a subset of the metadata. All plugins are managed by a PluginManager, and all pluggable metadata are loaded on demand in object lookup.

Flink’s existing core metadata will also be a plugin named as “CorePlugin”. Since we want to focus on supporting functions thru Plugins,we’ll only migrate Flink’s existing built-in functions into the CorePlugin at this moment as the first step.

All plugin metadata will be seen as a part of Flink table core, and won’t have namespaces.

How to Load Plugins

To load plugins, users have to make sure relevant classes are already in classpath.

Java/Scala:

// note the following plugins will be of the order they are specified

tableEnv.usePlugins(Core.INSTANCE, MyPlugin.INSTANCE, new Xxx(properties), new Yyy());


Yaml file:

Plugins: # note the following plugins will be of the order they are specified

   - type: core

   - type: myplugin

   - type: xxx

      property1: value1

      property2: value2

   - type: yyy


Since it doesn’t make sense to load the same plugin multiple times, plugins don’t need to have names.

Based on the plugins type defined in yaml file, SQL CLI will invoke factory service to search the factory class that provides the given plugin name, and then set them in TableEnvironment.

If users don’t set plugins, the CorePlugin will be used by default.

All plugin metadata are case-insensitive.

Resolution - Order and Case Sensentivity

Object will be resolved to plugins in the order they are defined either in program or in yaml configs. When there are objects sharing the same name, resolution logic will go thru plugins in order and return whatever the first one is found, the other ones sitting in the back in the order will be ignored. E.g. if plugins are set as “xxx, yyy” where xxx and yyy modules both have a function named “f”, then “f” will always be resolved as that in xxx module.

This FLIP will not take into consideration how to enable users use “f” in yyy module. We may allow users to do so by using whitelist/blacklist in the future, but they are not in the scope of this FLIP.

Besides, users may want to define different resolution orders for different metadata, e.g. “xxx, yyy” for functions, but “yyy, xxx” for data types. They will not be taken in this FLIP too. We can tackle that problem incrementally when there’s a real need from users.

Classes

The following is a generic design with functions as a specific example.

Plugin Interface

Plugin interface defines a set of metadata that a plugin can provide to Flink. It provides default implementations for all the APIs thus an implementation can implement only what it’s able to supply.

interface Plugin {
   default Set<String> listFunctions() { return Collections.EMPTY_SET };

   default Optional<FunctionDefinition> getFunctionDefinition(String name) { return Optional.empty() };
   // user defined types, operators, rules, etc

}

PluginFactory interface

PluginFactory defines a factory that is used for descriptors to uniquely identify a Plugin in service discovery, and create an instance of the plugin.

interface PluginFactory extends TableFactory {
   Plugin createPlugin(Map<String, String> properties);
}

CorePlugin and CorePluginFactory

CorePlugin is a pre-defined singleton plugin that contains all built-in metadata of Flink core.

We currently only move built-in functions into CorePlugin.

public class CorePlugin implements Plugin {
  public static final Core INSTANCE = new Core();

  private CorePlugin() { }

  @Override
  public Set<String> listFunctions() {
    return BuiltInFunctionDefinitions.getDefinitions().stream()
        .map(f -> f.getName())
        .collect(Collectors.toSet());
  }

  @Override
  public Optional<BuiltInFunctionDefinition> getFunctionDefinition(String name) {
    return BuiltInFunctionDefinitions.getDefinitions().stream()
          .filter(f -> f.getName().equalsIgnoreCase(name))
          .findFirst();
  }
}


class CorePluginFactory {
  @Override
  public Map<String, String> requiredContext() {
    Map<String, String> context = new HashMap<>();
    context.put("type", "core");
    return context;
  }

  @Override
  public List<String> supportedProperties() {
    return Collections.EMPTY_LIST;
  }

  @Override
  public Plugin createPlugin(Map<String, String> properties) {
    return CorePlugin.INSTANCE;
  }
}


PluginManager

PluginManager is responsible for loading all the Plugins, managing their life cycles, and resolve plugin objects.

public class PluginManager {
  private List<Plugin> plugins;

  public PluginManager() {
    this.plugins = new ArrayList<>(CorePlugin.INSTANCE);
  }

  public void setPlugins(List<Plugin> plugins) {
    this.plugins = plugins;
  }

  public Set<Set<String>> listFunctions() {
    return plugins.stream()
        .map(p -> p.listFunctions())
        .collect(Collectors.toSet());
  }

  public Optional<FunctionDefinition> getFunctionDefinition(String name) {
    Optional<Plugin> p = plugins.stream()
        .filter(p -> p.listFunctions().contains(name))
        .findFirst();

    return p.isPresent() ? p.get().getFunctionDefinition(name) : Optional.empty();
  }


// addUserDefinedTypes(), getUserDefinedTypes(), etc
}


FunctionCatalog

FunctionCatalog will hold PluginManager to resolve built-in functions.

class FunctionCatalog implements FunctionLookup {
  PluginManager pluginManager;
  CatalogManager catalogManager;

  public Optional<FunctionLookup.Result> lookupFunction(String name) {

    // search built-in functions in PluginManager, rather than BuiltInFunctionsDefinitions

    // Resolution order depends on FLIP-57: Rework FunctionCatalog

  }

}


There was some proposals of merging FunctionCatalog with CatalogManager. It will not be considered in this FLIP.

How to Write and Use a Self-Defined Plugin - Using HivePlugin as an Example

To support numerous Hive versions, we will use the shim approach, which is similar to that of existing HiveCatalog. Letting users explicitly specifying Hive versions is necessary since there are differences in Flink-Hive data conversions among different Hive versions.

public class HivePlugin implements Plugin {
  private static final Logger LOGGER = LoggerFactory.getLogger(HivePlugin.class);

  private final String hiveVersion;

  private final HiveShim hiveShim;

  public HivePlugin(String hiveVersion) {
    this.hiveVersion = hiveVersion;

     this.hiveShim = HiveShimLoader.load(hiveVersion);
  }

  @Override
  public Set<String> listFunctions() {
    ...
  }

  @Override
  public Optional<FunctionDefinition> getFunctionDefinition(String name) {
    ...
  }
}



public abstract class HivePluginFactory implements PluginFactory {

  @Override
  public Plugin createPlugin(Map<String, String> properties) {
    return new HivePlugin(properties.get("hive-version"));
  }

  @Override
  public Map<String, String> requiredContext() {
    return new HashMap<String, String>() {{
      put("type", "hive");
    }};
  }

  @Override
  public List<String> supportedProperties() {
    return Arrays.asList("hive-version");
  }
}


Java/Scala:

tableEnv.usePlugins(CorePlugin.INSTANCE, new HivePlugin("2_2_1"));


Yaml file:

plugins: 

  - type: core

  - type: hive

     hive-version: 2.2.1


Limitations and Future Work


As mention above, though this FLIP provides a generic design and mechanism for all plugin object types we want to support, we will only implement functions. Other objects can be added incrementally later on.

Reference


Document

https://docs.google.com/document/d/17CPMpMbPDjvM4selUVEfh_tqUK_oV0TODAUA9dfHakc/edit?usp=sharing

  • No labels