Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Get the intermediate result storage configuration
    1. The user calls registerIntermediateResultStorage explicitly in the code and passes the configuration.
    2. The cache manager loads the configuration as YAML file from the classpath 
  2. The cache manager passes the configuration to the TableFactoryService to load and configure the intermediate result storage.
  3. The TableFactory service finds the implementation using service loader, instantiate and configure the object and return it to the cache manager. The CacheManager maintain the intermediate result storage. After which, it is essentially the same as FLIP-36, i.e., maintaining a set of cached tables, replacing cached tables with TableSink/TableSource.

Image Added

Configurable IntermediateResultStorage

The key functionality of the pluggable intermediate result storage is writing and reading table to and from external storage. It is similar to what the TableSink and TableSource are doing, but they are also different in that the TableSink and TableSouce are designed to connect to the external storage which is managed by the user, while the cache service should manage the external storage itself. Therefore, the following requirements are needed:

  • Create a location for each table on the corresponding external storage if it does not already exist. 

...

  • Currently, only the file system meets the requirement, which will create the file to write to if it is not already created. Other than the file system, the table sink will expect the destination of where the table will be written to is already created by the user. This is unacceptable for it to serve as the storage of intermediate results. Imagine users have to create tens of tables on a MySQL database and tens of other tables for another Flink application. And in some cases, the user may not even know how many tables will be cached.
  • Take a table name of a table and map it to the corresponding configuration field of the external storage. 

...

  • At the current state, the TableSinkFactory and TableSourceFactory take a configuration as a Map<String, String> and different TableSinkFactory will map the logical table name to a field with a different key. For example, CsvTableSinkFactoryBase uses “connector.path”, KafkaTableSourceSinkFactoryBase uses “connector.topic‘ and JDBCTableSourceSinkFactory uses “connector.table”.  This is an essential requirement as the cache service need to create different TableSink/TableSouce for different tables.
  • Clean up the physical storage. 

...

  • As cache storage, it is important to be able to reclaim the storage space when the cache is invalidated, which is not supported currently.

Therefore, the IntermediateResultStorage interface is introduced. Any class that implements this interface can be used by the cache service and served as external storage for the intermediate result. The interface extends the TableFactory interface so that the implementation can be discovered with service loader similar to how the current TableSinkFactory and TableSourceFactory work. And we can reuse the TableFactoryService class to do the discovery and loading. 

...