Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

// note the following plugins will be of the order they are specified

tableEnv.usePlugins(Core.INSTANCE, MyPlugin.INSTANCE, new Xxx(properties), new Yyy());


Yaml file:

Plugins: # note the following plugins will be of the order they are specified

   - type: core

   - type: myplugin

   - type: xxx

      property1: value1

      property2: value2

   - type: yyy

...

Plugin interface defines a set of metadata that a plugin can provide to Flink. It provides default implementations for all the APIs thus an implementation can implement only what it’s able to supply.

interface Plugin {
   default Set<String> listFunctions() { return Collections.EMPTY_SET };

   default Optional<FunctionDefinition> getFunctionDefinition(String name) { return Optional.empty() };
   // user defined types, operators, rules, etc

}

PluginFactory interface

PluginFactory defines a factory that is used for descriptors to uniquely identify a Plugin in service discovery, and create an instance of the plugin.

interface PluginFactory extends TableFactory {
   Plugin createPlugin(Map<String, String> properties);
}

CorePlugin and CorePluginFactory

...

We currently only move built-in functions into CorePlugin.

public class CorePlugin implements Plugin {
  public static final Core INSTANCE = new Core();

  private CorePlugin() { }

  @Override
  public Set<String> listFunctions() {
    return BuiltInFunctionDefinitions.getDefinitions().stream()
        .map(f -> f.getName())
        .collect(Collectors.toSet());
  }

  @Override
  public Optional<BuiltInFunctionDefinition> getFunctionDefinition(String name) {
    return BuiltInFunctionDefinitions.getDefinitions().stream()
          .filter(f -> f.getName().equalsIgnoreCase(name))
          .findFirst();
  }
}


class CorePluginFactory {
  @Override
  public Map<String, String> requiredContext() {
    Map<String, String> context = new HashMap<>();
    context.put("type", "core");
    return context;
  }

  @Override
  public List<String> supportedProperties() {
    return Collections.EMPTY_LIST;
  }

  @Override
  public Plugin createPlugin(Map<String, String> properties) {
    return CorePlugin.INSTANCE;
  }
}


PluginManager

PluginManager is responsible for loading all the Plugins, managing their life cycles, and resolve plugin objects.

public class PluginManager {
  private List<Plugin> plugins;

  public PluginManager() {
    this.plugins = new ArrayList<>(CorePlugin.INSTANCE);
  }

  public  public void setPlugins(List<Plugin> plugins) {
    this.plugins = plugins;
  }

  public Set<Set<String>> listFunctions() {
    return plugins.stream()
        .map(p -> p.listFunctions())
        .collect(Collectors.toSet());
  }

  public Optional<FunctionDefinition> getFunctionDefinition(String name) {
    Optional<Plugin> p = plugins.stream()
        .filter(p -> p.listFunctions().contains(name))
        .findFirst();

    return p.isPresent() ? p.get().getFunctionDefinition(name) : Optional.empty();
  }


// addUserDefinedTypes(), getUserDefinedTypes(), etc
}


FunctionCatalog

FunctionCatalog will hold PluginManager to resolve built-in functions.

class FunctionCatalog implements FunctionLookup {
  PluginManager pluginManager;
  CatalogManager catalogManager;

  public Optional<FunctionLookup.Result> lookupFunction(String name) {

    // search built-in functions in PluginManager, rather than BuiltInFunctionsDefinitions

    // Resolution order depends on FLIP-57: Rework FunctionCatalog

  }

}

...

To support numerous Hive versions, we will use the shim approach, which is similar to that of existing HiveCatalog. Letting users explicitly specifying Hive versions is necessary since there are differences in Flink-Hive data conversions among different Hive versions.

public class HivePlugin implements Plugin {
  private static final Logger LOGGER = LoggerFactory.getLogger(HivePlugin.class);

  private final String hiveVersion;

  private final HiveShim hiveShim;

  public HivePlugin(String hiveVersion) {
    this.hiveVersion = hiveVersion;

     this.hiveShim = HiveShimLoader.load(hiveVersion);
  }

  @Override
  public Set<String> listFunctions() {
    ...
  }

  @Override
  public Optional<FunctionDefinition> getFunctionDefinition(String name) {
    ...
  }
}



public abstract class HivePluginFactory implements PluginFactory {

  @Override
  public Plugin createPlugin(Map<String, String> properties) {
    return new HivePlugin(properties.get("hive-version"));
  }

  @Override
  public Map<String, String> requiredContext() {
    return new HashMap<String, String>() {{
      put("type", "hive");
    }};
  }

  @Override
  public List<String> supportedProperties() {
    return Arrays.asList("hive-version");
  }
}


Java/Scala:

tableEnv.usePlugins(CorePlugin.INSTANCE,new new HivePlugin("2_2_1"));


Yaml file:

plugins: 

  - type: core

  - type: hive

     hive-version: 2.2.1

...