Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-13570

Release

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state[One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

...



Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Describe the problems you are trying to solve.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

FLIP-36 proposes to add the cached table to avoid duplicated computation and provide a better user experience on interactive programming. The intermediate result storage is the backend storage where the content of the cached table stored. And FLIP-36 provides a default implementation of the intermediate result storage so that the user can use the cache out of the box. It utilizes the ShuffleMaster/ShuffleService to persist the intermediate result and keep the intermediate result available across jobs. 

For more advanced usage of the cache, users may want to plug in some external storages to store the intermediate result. For example, users can use a table source that supports projection or filter push-down, i.e., ParquetTableSource, OrcTableSource. This may reduce the scanning time if the job only used some of the columns of the cached table. 

In addition to the performance improvement, the ability to store the intermediate result to external storage is required for across application sharing (Phase 3). 

Public Interfaces

Add an registerIntermediateResultStorage to the TableEnvironment

Code Block
languagejava
	/**
	 * Register the intermediate result storage, which is used to store the content of the cached table.
	 * @param descriptor the descriptor contains all the necessary information to discover and
	 *        configure the intermediate result storage.
	 */
	void registerIntermediateResultStorage(IntermediateResultStorageDescriptor descriptor);


Introduce New Descriptor

Code Block
languagejava

public abstract class IntermediateResultStorageDescriptor implements Descriptor {
	private String type;
	private Map<String, String> config = new HashMap<>();

	/**
	 * add the key value pair to the config
	 */
	public IntermediateResultStorageDescriptor withConfig(String key, String value) {...}
	
	/**
	 * set the type of the intermediate result storage
	 */
	public IntermediateResultStorageDescriptor withType(String type) {...}

	/**
	 * Converts this descriptor into a set of intermediate result storage properties.
	 */
	protected abstract Map<String, String> toIntermediateResultStorageProperties();
}


Introduce IntermediateResultStorage Interface

Code Block
languagejava

/**
 * This interface represents a backend end storage where the intermediate result stored upon caching.
 * It provides the necessary methods for the CacheManager to substitute cached table with TableSink and TableSource.
 */
public interface IntermediateResultStorage extends Configurable, TableFactory {

	/**
	 * Get the TableSourceFactory which the cache manager uses to create the TableSource to replace the cached table.
	 */
	TableSourceFactory getTableSourceFactory();

	/**
	 * Get the TableSinkFactory which the cache manager uses to create the TableSink when a table need to be cached.
	 */
	TableSinkFactory getTableSinkFactory();

	/**
	 * The cache manager gets the TableCleanUpHook and invoke the cleanup method to reclaim the space when
	 * some cached tables are invalidate.
	 */
	TableCleanUpHook getCleanUpHook();

	/**
	 * Table cache manager gets the TableCreationHook and invoke the createTable method before creating the
	 * TableSource or TableSink.
	 */
	TableCreationHook getTableCreationHook();
}


Introduce Configurable Interface

Code Block
languagejava
/**
 * The Configurable interface indicates that the class can be instantiated by reflection.
 * And it needs to take some configuration parameters to configure itself.
 *
 */
public interface Configurable {

	/**
	 * Configure the class with the given key-value pairs
	 */
	void configure(Map<String, String> configs);
}


Introduce Table Creation/Cleanup Hook

Code Block
languagejava
/**
 * The interface is used by the CacheManager before creating the TableSink and TableSource.
 */
public interface TableCreationHook {
	
	/**
	 * Create the physical location to store the table and return the
	 * configuration that encodes the given table name.
	 */
	Map<String, String> createTable(String tableName, Map<String, String> properties);
}

/**
 * The interface is used by the CacheManager whenever some table caches are invalidated.
 */
public interface TableCleanUpHook {

	/**
	 * Delete the physical storage for tables with the given table names.
	 */
	void clean(Collection<String> tableNames, Map<String, Map<String, String>> properties);
}

Proposed Changes

All the changes we need to make are on the Table API and the runtime is untouched. The workflow of loading and configuring the intermediate result storage is the following:

  1. Get the intermediate result storage configuration
    1. The user calls registerIntermediateResultStorage explicitly in the code and passes the configuration.
    2. The cache manager loads the configuration as YAML file from the classpath 
  2. The cache manager passes the configuration to the TableFactoryService to load and configure the intermediate result storage.
  3. The TableFactory service finds the implementation using service loader, instantiate and configure the object and return it to the cache manager. The CacheManager maintain the intermediate result storage. After which, it is essentially the same as FLIP-36, i.e., maintaining a set of cached tables, replacing cached tables with TableSink/TableSource.

Image Added

Configurable IntermediateResultStorage

The key functionality of the pluggable intermediate result storage is writing and reading table to and from external storage. It is similar to what the TableSink and TableSource are doing, but they are also different in that the TableSink and TableSouce are designed to connect to the external storage which is managed by the user, while the cache service should manage the external storage itself. Therefore, the following requirements are needed:

  • Create a location for each table on the corresponding external storage if it does not already exist. Currently, only the file system meets the requirement, which will create the file to write to if it is not already created. Other than the file system, the table sink will expect the destination of where the table will be written to is already created by the user. This is unacceptable for it to serve as the storage of intermediate results. Imagine users have to create tens of tables on a MySQL database and tens of other tables for another Flink application. And in some cases, the user may not even know how many tables will be cached.
  • Take a table name of a table and map it to the corresponding configuration field of the external storage. At the current state, the TableSinkFactory and TableSourceFactory take a configuration as a Map<String, String> and different TableSinkFactory will map the logical table name to a field with a different key. For example, CsvTableSinkFactoryBase uses “connector.path”, KafkaTableSourceSinkFactoryBase uses “connector.topic‘ and JDBCTableSourceSinkFactory uses “connector.table”.  This is an essential requirement as the cache service need to create different TableSink/TableSouce for different tables.
  • Clean up the physical storage. As cache storage, it is important to be able to reclaim the storage space when the cache is invalidated, which is not supported currently.

Therefore, the IntermediateResultStorage interface is introduced. Any class that implements this interface can be used by the cache service and served as external storage for the intermediate result. The interface extends the TableFactory interface so that the implementation can be discovered with service loader similar to how the current TableSinkFactory and TableSourceFactory work. And we can reuse the TableFactoryService class to do the discovery and loading. 

The TableFactoryService use the service loader to instantiate the object with the parameterless constructor. However, it is limited without the ability to initialize the internal state of the object. Therefore, the IntermediateResultStorage extends the Configurable interface.

Classes that implement Configurable interface should have parameterless constructor, and the configure method should fully configure itself. The TableFactoryService will use the class loader to instantiate the object. Then, TableFactoryService should call the configure method if the object implements the Configurable interface.

The TableCreationHook and TableCleanupHook are functional interfaces, which will be called by the cache service before the cache creating and after the cache is invalidated. The TableCreationHook is responsible for preparing the location to store the content of the cached table and map the table name to fields in the configuration so that the TableSinkFactory/TableSourceFactory can understand. The TableCleanupHook is responsible for deleting the content of the cached tables and reclaim the space.

Since the cache service sits on the client-side, we have to make an assumption at the moment that the client has access to the external storage and it is capable to create and delete tables. In the future, we could integrate with the Flink Driver(FLIP-40) and let the driver run the creation hook and clean up hook in the Flink cluster. 

Integration with FLIP-36

The default intermediate result storage could implement the IntermediateResultStorage, and we can have a unified interface for default and external intermediate result storage.

Configure IntermediateResultStorage

Specification

There are some reserved keys used by the cache service and TableFactoryService. Other than those reserved keys, the implementation of the IntermediateResultStorage can specify the required keys and supported keys by implementing the requiredContext and supportedProperties methods.

Here are the reserved keys for IntermediateResultStorage descriptor

  • intermediate-result-storage.type
  • intermediate-result-storage.config

With Code

Users can write code to configure the IntermediateResultStorage using descriptor similar to how they configure a connector. Similar to the abstract ConnectorDescriptor class, we introduce the abstract class IntermediateResultStorageDescriptor.

Code Block
languagejava
tEnv.registerIntermediateResultStorage( 
    new FileSystemIntermediateResultStorage()
        .withConfig("cache-path", "/tmp")
        .withConfig("key", "value")
);


With YAML

To make the external storage configurable without modifying the code, meaning users can use the configuration file (.yaml) to configure the IntermediateResultStorage. One example of such a configuration file can be the following.

Code Block
intermediate-result-storage:
  type: filesystem
  config:
    cache-path: /tmp
    key: value


The cache service would load the YAML file from the classpath and parse the YAML file to key-value properties that can be used by the Table API. For example, the YAML above needs to be translated to the following:

Code Block
intermediate-result-storage.type = filesystem
intermediate-result-storage.config.cache-path = /tmp
intermediate-result-storage.config.key = value

The TableFactoryService can use fields with prefix intermediate-result-storage.configs to configure the intermediate result storage.

Migration Plan

It is backward compatible.

Rejected Alternatives

None so farIf there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.