Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

JIRA: TBD

...

serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-28415

Release1.16


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

This FLIP is a joint work of initially proposed by Yuan Zhu (zstraw@163.com), and finished by Qingsheng Ren (partial caching part) and Alexander Smirnov (full caching part).


Table of Contents

Motivation

As a widely-used feature in Flink SQL jobs, the performance of lookup table source is essential not only for users but also source developers for tuning their implementations. Most lookup table sources use cache to achieve better performance, but there are some features missing in the current design of cache:

...

In order to address the issues above, we propose here to define a unified abstraction for lookup source cache and its related metrics.

Proposed Changes

Top-level APIs

In order to clarify the semantic of lookup, We propose a new public interface for cache and a set of conventional / standard metrics. Most of designs are inspired by Google GuavaCache. Also we'd like to introduce new table options for configuring lookup table caching via Table/SQL API.

Cache Metrics

It is important to mention that a cache implementation does not have to report all the defined metrics. But if a cache reports a metric of the same semantic defined below, the implementation should follow the convention.

...

The number of times to load data into cache from external system.

For LRU cache the load count should be equal to miss count, but for all cache this would be different.

...

some top-level APIs for general lookup operations without caching:

  • LookupFunction / AsyncLookupFunction, an extended version of TableFunction to make the API more straight forward.
  • LookupFunctionProvider / AsyncLookupProvider, serve as the creator of LookupFunction / AsyncLookupFunction in table source

And APIs related to the cache:

  • LookupCache, defining the cache used in lookup table.
  • DefaultLookupCache a default implementation of a cache that suitable for most use cases.
  • CacheMetricGroup, defining metrics should be reported by the cache.

Partial and Full Caching

More specifically,  we'd like to provide public interfaces for the most 2 common cases to lookup source developers, which are named as partial and full caching.

Partial caching

Partial caching loads data into the cache along with the access to the external system. If the key to lookup does not exist in the cache, a lookup action to the external system will be triggered and the lookup result will be stored in the cache for further loading. Users and lookup table developers are able to config the eviction policy and maximum size of the cache.

In order to support partial caching, we propose to introduce 2 new interfaces:

  • PartialCachingLookupProvider / AsyncPartialCachingLookupProvider, as the API interacting with table source to get LookupFunction and LookupCache.

The cache serves as a component in LookupJoinRunner, and would be pluggable by specifying in the constructor of the provider. The planner will take over the lookup function and the cache created from the provider and pass it to the LookupJoinRunner. The cache will be instantiated during the runtime execution and loading operations via lookup function if there's a cache miss.

Full Caching

If the size of lookup table is relatively small to fit into the memory, and the lookup table doesn't change frequently, it'll be more efficient to load all entries of the lookup table into the cache to reduce network I/O, and refresh the table periodically.  We'd like to name this use case as "full cache". Logically the reload operation is a kind of scan, so we'd like to reuse the ScanRuntimeProvider so that developers could reuse the scanning logic implemented in Source / SourceFunction / InputFormat. Considering the complexity of Source API, we'd like to support SourceFunction and InputFormat API first. Supporting Source API might require new topology and will be discussed later in another FLIP.

We propose to introduce several new interfaces:

  • FullCachingLookupProvider, for reusing the ability of scanning.
  • CacheReloadTrigger, for customizing reloading strategies of all entries in the full cache. 

Also we'd like to provide two default implementations of CacheReloadTrigger:

  • PeriodicCacheReloadTrigger, for triggering reload periodically with a specific interval
  • TimedCacheReloadTrigger, for triggering reload at the specific time and repeat with the interval in days.

Public Interfaces

Lookup Functions

As the usage of TableFunction interface is not quite straight forward to lookup table developers, we'd like to introduce a new interface for sync and async lookup tables. Caching will be only supported on LookupFunction / AsyncLookupFunction.

Code Block
languagejava
titleLookupFunction
/**
 * A wrapper class of {@link TableFunction} for synchronously lookup rows matching the lookup keys
 * from external system.
 *
 * <p>The output type of this table function is fixed as {@link RowData}.
 */
@PublicEvolving
public abstract class LookupFunction extends TableFunction<RowData> {

    /**
     * Synchronously lookup rows matching the lookup keys.
     *
     * @param keyRow - A {@link RowData} that wraps keys to lookup.
     * @return A collections of all matching rows in the lookup table.
     */
    public abstract Collection<RowData> lookup(RowData keyRow) throws IOException;

    /** Invoke {@link #lookup} and handle exceptions. */
    public final void eval(Object... keys) {
        try {
            lookup(GenericRowData.of(keys)).forEach(this::collect);
        } catch (IOException e) {
            throw new RuntimeException("Failed to lookup values with given key", e);
        }
    }
}


Code Block
languagejava
titleAsyncLookupFunction
/**
 * A wrapper class of {@link AsyncTableFunction} for asynchronously lookup rows matching the lookup
 * keys from external system.
 *
 * <p>The output type of this table function is fixed as {@link RowData}.
 */
@PublicEvolving
public abstract class AsyncLookupFunction extends AsyncTableFunction<RowData> {

    /**
     * Asynchronously lookup rows matching the lookup keys

Here we just define fundamental metrics and let the external metric system make the aggregation to get more descriptive values such as hitRate = hitCount / (hitCount + missCount).

Scope

The metric group for the cache would be a sub-group of the OperatorMetricGroup where the table function belongs to.

Public Interfaces

We propose to add new interfaces that defines abstraction of cache and related helper classes to build TableFunction with cache easily:

  • Cache, a base interface for all caches
  • CacheMetricGroup, a interface defining cache related metrics
  • CacheConfig, a wrapper class containing configurations of the cache
  • CachingTableFunction, a helper TableFunction class with cache
  • AllCachingTableFunction, a helper TableFunction class for lookup sources that loads all entries into the cache on start and reload at fixed rate

Lookup Functions

As the usage of TableFunction interface is not quite straight forward to lookup table developers, we'd like to introduce a new interface for lookup tables.

Code Block
languagejava
titleLookupFunction
/**
 * A wrapper class of {@link TableFunction} for synchronously lookup rows matching the lookup keys
 * from external system.
 *
 * <p>The output type of this table function is fixed as {@link RowData}.
 */
@PublicEvolving
public abstract class LookupFunction extends TableFunction<RowData> {

    /**
     * Synchronously lookup rows matching the lookup keys.
     *
     * @param keys - Keys to lookup.
     * @return A collections of all matching rows in the lookup table.
     */
    public abstract Collection<RowData> lookup(Object... keys) throws IOException;

    /** Invoke {@link #lookup} and handle exceptions. */
    public final void eval(Object... keys) {
        try {
            lookup(keys).forEach(this::collect);
        } catch (IOException e) {
            throw new RuntimeException("Failed to lookup values with given key", e);
        }
    }
}

Cache

A base interface for all caches:

Code Block
languagejava
titleCache
/**
 * A semi-persistent mapping from keys to values. Cache entries are manually added using {@link
 * #put(Object, Object)}, and are stored in the cache until either evicted or manually invalidated.
 *
 * <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
 * by multiple concurrent threads.
 *
 * @param <K> key
 * @param <V> value
 */
public interface Cache<K, V> extends AutoCloseable {

    /**
     * Initialize the cache.
     *
     * @param cacheMetricGroup - metric group for reporting metrics of the cache
     */
    void open(CacheMetricGroup cacheMetricGroup);

    /**
     * Returns the value associated with key in this cache, or null if there is no cached value for
     * key.
     */
    V getIfPresent(K key);

    /**
     * Returns the value associated with the key in the cache, or obtain the value by loader if the
     * key doesn't exist.
     *
     * @param keykeyRow key - theA key{@link whoseRowData} associatedthat valuewraps iskeys to be returnedlookup.
     * @param@return loaderA -collections theof loaderall formatching loadingrows associatedin valuesthe from external systemlookup table.
     */
 @return  The valuepublic associatedabstract withCompletableFuture<Collection<RowData>> the keyasyncLookup(RowData keyRow);

     /** @throwsInvokes IOException{@link if#asyncLookup} anyand exceptionchains was thrown while loading the value
     */
    V get(K key, Callable<? extends V> loader) throws IOException;

    /**
     * Associates the specified value with the specified key in the cache. If the cache previously
     * contained a value associated with the key, the old value is replaced by the specified value.
     *
     * @return the previous value associated with key, or null if there was no mapping for key.
     * @param key - key with which the specified value is to be associated
     * @param value – value to be associated with the specified key
     */
    V put(K key, V value);

    /**
     * Copies all the mappings from the specified map to the cache. The effect of this call is
     * equivalent to that of calling {@code put(k, v)} on this map once for each mapping from key
     * {@code k} to value {@code v} in the specified map. The behavior of this operation is
     * undefined if the specified map is modified while the operation is in progress.
     */
    void putAll(Map<? extends K, ? extends V> m);

    /** Delete all entries in the cache. */
    void clean();

    /** Returns the number of key-value mappings in the cache. */
    long size();
}

CacheMetricGroup

...

futures. */
    public final void eval(CompletableFuture<Collection<RowData>> future, Object... keys) {
         asyncLookup(GenericRowData.of(keys))
                .whenCompleteAsync(
                        (result, exception) -> {
                            if (exception != null) {
                                future.completeExceptionally(exception);
                                return;
                            }
                            future.complete(result);
                        });
    }
}

LookupFunctionProvider

Code Block
languagejava
titleLookupFunctionProvider
/** A provider for creating {@link LookupFunction}. */
@PublicEvolving
public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {

    static LookupFunctionProvider of(LookupFunction lookupFunction) {
        return () -> lookupFunction;
    }

    LookupFunction createLookupFunction();
}

AsyncLookupFunctionProvider

Code Block
languagejava
titleCacheMetricGroupAsyncLookupFunctionProvider
/** Pre-definedA metricsprovider for creating {@code@link CacheAsyncLookupFunction}. */
@PublicEvolving
public interface CacheMetricGroup AsyncLookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {

    /** The numberstatic AsyncLookupFunctionProvider of(AsyncLookupFunction cache hits. */
asyncLookupFunction) {
     Counter getHitCounter();

  return  /** The number of cache misses. */() -> asyncLookupFunction;
    }

    CounterAsyncLookupFunction getMissCountercreateAsyncLookupFunction();

    /** The number of times to load data into cache from external system. */
    Counter getLoadCounter();

    /** The number of load failures. */
    Counter getNumLoadFailureCounter();
 
    /** The time spent for the latest load operation. */
    void setLatestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge); 

    /** The number of records in cache. */
    void setNumCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);

    /** The number of bytes used by cache. */
    void setNumCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
}

CacheConfig

A wrapper class containing parameters of the cache:

Code Block
languagejava
titleCacheConfig
public class CacheConfig implements ReadableConfig, WritableConfig {

    private final Configuration configuration = new Configuration();

    public CacheConfig setCacheStrategy(CacheConfigOptions.CacheStrategy strategy) {
        configuration.set(CacheConfigOptions.CACHE_STRATEGY, strategy);
        return this;
    }

    public CacheConfig setTTL(Duration ttl) {
        configuration.set(CacheConfigOptions.CACHE_TTL, ttl);
        return this;
    }

    public CacheConfig setMaxEntries(long maxEntries) {
        configuration.set(CacheConfigOptions.CACHE_MAX_ENTRIES, maxEntries);
        return this;
    }

    public CacheConfig setAllowEmptyValues(boolean allowEmptyValues) {
        configuration.set(CacheConfigOptions.CACHE_ALLOW_EMPTY_VALUES, allowEmptyValues);
        return this;
    }

    @Override
    public <T> T get(ConfigOption<T> option) {
        return configuration.get(option);
    }

    @Override
    public <T> Optional<T> getOptional(ConfigOption<T> option) {
        return configuration.getOptional(option);
    }

    @Override
    public <T> WritableConfig set(ConfigOption<T> option, T value) {
        return configuration.set(option, value);
    }
}  

Cache Options

}

LookupCache

Considering there might be custom caching strategies and optimizations, we'd like to expose the cache interface as public API for developers to make the cache pluggable.

Code Block
languagejava
titleLookupCache
/**
 * A semi-persistent mapping from keys to values for storing entries of lookup table.
 *
 * <p>The type of the caching key is a {@link RowData} with lookup key fields packed inside. The
 * type of value is a {@link Collection} of {@link RowData}, which are rows matching lookup key
 * fields.
 *
 * <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either
 * evicted or manually invalidated.
 *
 * <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
 * by multiple concurrent threads.
 */
@PublicEvolving
public interface LookupCache extends AutoClosable, Serializable {

    /**
     * Initialize the cache.
     *
     * @param metricGroup the metric group to register cache related metrics.
     */
    void open(CacheMetricGroup metricGroup);

    /**
     * Returns the value associated with key in this cache, or null if there is no cached value for
     * key.
     */
    @Nullable
    Collection<RowData> getIfPresent(RowData key);

    /**
     * Associates the specified value rows with the specified key row in the cache. If the cache
     * previously contained value associated with the key, the old value is replaced by the
     * specified value.
      *
     * @return the previous value rows associated with key, or null if there was no mapping for key.
     * @param key - key row with which the specified value is to be associated
     * @param value – value rows to be associated with the specified key
     */
    Collection<RowData> put(RowData key, Collection<RowData> value);

    /** Discards any cached value for the specified key. */
    void invalidate(RowData key);

    /** Returns the number of key-value mappings in the cache. */
    long size();
}

DefaultLookupCache

As the cache should be instantiated during runtime execution to avoid serialization / deserialization, a factory is required for creating the cache. 

Code Block
languagejava
titleDefaultLookupCache
/** Default implementation of {@link LookupCache}. */
@PublicEvolving
public class DefaultLookupCache implements LookupCache {
    private final Duration expireAfterAccessDuration;
    private final Duration expireAfterWriteDuration;
    private final Long maximumSize;
    private final boolean cacheMissingKey;
    
    private DefaultLookupCache
Code Block
languagejava
titleCacheConfigOptions
public class CacheConfigOptions {

    public static final ConfigOption<CacheStrategy> CACHE_STRATEGY =
            ConfigOptions.key("lookup.cache.strategy")
                    .enumType(CacheStrategy.class)
                    .noDefaultValue()
                    .withDescription(
            Duration expireAfterAccessDuration,
            Duration expireAfterWriteDuration,
   "Strategy of the cache to load and evict entries, includingLong ALLmaximumSize, LRU and NONE");


			boolean cacheMissingKey) {
    public static final ConfigOption<Duration> CACHE_TTL this.expireAfterAccessDuration = expireAfterAccessDuration;
        this.expireAfterWriteDuration = expireAfterWriteDuration;
  ConfigOptions.key("lookup.cache.ttl")
      this.initialCapacity = initialCapacity;
        this.maximumSize = maximumSize;
		this.cacheMissingKey  .durationType()= cacheMissingKey;
    }
    
    public static Builder newBuilder() {
        return new .noDefaultValueBuilder();
    } 

   public static class Builder {         
    .withDescription("Time-to-live (TTL) of entries in cache");

    private Duration expireAfterAccessDuration;
     public static final ConfigOption<Long> CACHE_MAX_ENTRIES =private Duration expireAfterWriteDuration;
        private    ConfigOptions.key("lookup.cache.max-entries")Long maximumSize;
        private Boolean cacheMissingKey;

        public Builder .longTypeexpireAfterAccess(Duration duration) {
            expireAfterAccessDuration = duration;
      .noDefaultValue()
      return this;
        }

     .withDescription("The maximum number ofpublic entriesBuilder that the cache could contain");

expireAfterWrite(Duration duration) {
    public static final ConfigOption<Boolean> CACHE_ALLOW_EMPTY_VALUES =
   expireAfterWriteDuration = duration;
       ConfigOptions.key("lookup.cache.allow-empty-values")
     return this;
        }

      .booleanType()
  public Builder maximumSize(long maximumSize) {
            this.maximumSize  .noDefaultValue()= maximumSize;
            return this;
        .withDescription(}

        public Builder cacheMissingKey(boolean cacheMissingKey) {
            this.cacheMissingKey = cacheMissingKey;
  "Whether to allow empty values associated with a key in thereturn cache")this;

    public enum CacheStrategy {
 }          

   ALL,
     public DefaultLookupCache build() LRU,{
        NONE
    return new }
}

CachingTableFunction

A helper TableFunction with a default or a custom cache implementation.

Code Block
languagejava
titleCachingTableFunction
/**
 * Base class for lookup function that supports caching.
 */
@PublicEvolving
public abstract class CachingTableFunction extends TableFunction<RowData> {
    private final Cache<RowData, List<RowData>> cache;

    /**
DefaultLookupCache(
                    expireAfterAccessDuration,
         * Use default cache implementation with specified cache configuration.
   expireAfterWriteDuration,
  *
     * @param cacheConfig - configuration of the default cache.
     */maximumSize,
					cacheMissingKey);
    public  CachingTableFunction(CacheConfig cacheConfig) {}
    }     cache = new DefaultCache<>(cacheConfig);
    }

    /**
     * Use custom cache implementation.
     *
     * @param cache - custom cache implementation
     */
    public CachingTableFunction(Cache<RowData, List<RowData>> cache)   
}

CacheMetricGroup

An interface defining all cache related metric:

Code Block
languagejava
titleCacheMetricGroup
/**
 * Pre-defined metrics for cache.
 *
 * <p>Please note that these methods should only be invoked once. Registering a metric with same
 * name for multiple times would lead to an undefined behavior.
 */
@PublicEvolving
public interface CacheMetricGroup extends MetricGroup {
    /** The number of this.cache = cache;
hits. */
    void  }hitCounter(Counter hitCounter);

    @Override
/** The number of publiccache void open(FunctionContext context) throws Exception {
        cache.open(new InternalCacheMetricGroup(context.getMetricGroup()));
    }

	public void eval(Object... keys){
		// Check the cache first, or load via lookup() if the cache doesn't contains the key
    }
   misses. */
    void missCounter(Counter missCounter);

    /** The number of times to load data into cache from external system. */
    void loadCounter(Counter loadCounter);

    /** The number of load failures. */
    void numLoadFailuresCounter(Counter numLoadFailuresCounter);

    /**
 The time spent for *the Lookuplatest rowsload associated with the key row from external system.operation. */
    void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);

     /**
 The number of records *in @param key the key row containing lookup fieldscache. */
    void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);

     /** @throwsThe IOExceptionnumber ifof anybytes exceptionused was thrown while looking up the value
     by cache. */
    public abstract List<RowData> lookup(RowData key) throws IOException; 
}

AllCachingTableFunction

void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
}

PartialCachingLookupProvider

This is the API between table framework and user's table source. Implementation should define how to create a lookup function and whether to use cacheA helper TableFunction that load all entries into the cache and schedules reloading by the specified TTL

Code Block
languagejava
titleAllCachingTableFunctionPartialCachingLookupProvider
/**
 * Base classProvider for lookupcreating function{@link thatLookupFunction} loadsand all{@link entriesLookupCache} fromfor externalstoring system and scheduleslookup entries.
 */
@PublicEvolving
public reloadinginterface byPartialCachingLookupProvider theextends specifiedLookupFunctionProvider TTL.{

    /**/
public   abstract class AllCachingTableFunction* extendsBuild TableFunction<RowData>a {
 	private Cache<RowData, List<RowData>> cache;

@link PartialCachingLookupProvider} from the specified {@link LookupFunction} and
     //* Interval to reload all entries in cache{@link LookupCache}.
     */
    privatestatic finalPartialCachingLookupProvider Duration reloadInterval;

	public AllCachingTableFunction(CacheConfig cacheConfigof(LookupFunction lookupFunction, LookupCache cache) {
		cache = new DefaultCache(cacheConfig);
		reloadInterval = cacheConfig.get(TTL);
	}

   return publicnew AllCachingTableFunction(Cache<RowData, List<RowData>> cache, Duration reloadIntervalPartialCachingLookupProvider() {

        this.cache = cache;
  @Override
      this.reloadInterval = reloadInterval;
    }

public LookupCache getCache()  @Override{
    public void open(FunctionContext context) throws Exception {
        super.open(context)return cache;
        scheduleCacheReload(this::reloadAllEntries, reloadInterval);
    }

    public void eval(Object... keys) {
		// Only check the cache here@Override
	}

    /**
     * Reload all entriespublic inLookupFunction the cache.createLookupFunction() {
     *
     * <p>This function will be invoked periodically inreturn alookupFunction;
 scheduled executor, which invalidates all
     * entries in}
 the cache and reloads all entries from external system. };
     *}

     /** @paramGet cachea -new theinstance cacheof in which all entries to be reloaded
     * @throws IOException - if any exception was thrown while reloading the value
     */
    public abstract void reloadAllEntries(Cache<RowData, List<RowData>> cache) throws IOException;
}

...

{@link LookupCache}. */
    LookupCache getCache();
}

PartialCachingAsyncLookupProvider

Code Block
languagejava
titleCachingTableFunctionPartialCachingAsyncLookupProvider
/**
 * BaseProvider class for asynchronouscreating lookup{@link functionAsyncLookupFunction} that supports cachingand {@link LookupCache} for storing lookup entries.
 */
@PublicEvolving
public abstractinterface class CachingAsyncTableFunctionPartialCachingAsyncLookupProvider extends AsyncTableFunction<RowData>AsyncLookupFunctionProvider {
    private final Cache<RowData, List<RowData>> cache;

    /**
     * Use default cache implementation with specified cache configuration.Build a {@link PartialCachingLookupProvider} from the specified {@link AsyncLookupFunction} and
     * {@link LookupCache}.
     */
 @param  cacheConfig -static configurationPartialCachingLookupProvider of(AsyncLookupFunction theasyncLookupFunction, defaultLookupCache cache.) {
     */
   return publicnew CachingTableFunctionPartialCachingAsyncLookupProvider(CacheConfig cacheConfig) {

        cache  = new DefaultCache<>(cacheConfig);@Override
    }

    /**
    public *LookupCache Use custom cache implementation.
getCache() {
       *
     * @param cache - customreturn cache implementation;
     */
    public CachingTableFunction(Cache<RowData, List<RowData>> cache) {
}

         this.cache  = cache;@Override
    }

    @Override
    public voidAsyncLookupFunction opencreateAsyncLookupFunction(FunctionContext context) throws Exception {
        cache.open(new InternalCacheMetricGroup(context.getMetricGroup()));
    }

	public void eval(CompletableFuture<Collection<RowData>> result, Object... keys){
		// Check the cache first and complete the result directly if cache hit, 
		// or load asynchronously via lookupAsync() if the cache doesn't contains the keyreturn asyncLookupFunction;
            }
        };
    }
 
  
    /**
 Get a new instance *of Lookup rows associated with the key row asynchronously from external system and return the
     * {@link CompletableFuture} of the lookup result.
     *
     * @param key - the key row whose associated value rows is to be returned
     */
    public abstract CompletableFuture<List<RowData>> lookupAsync(RowData key);
}

Table Options

We propose to add new table options below for configuring lookup table and its underlying cache:

...

{@link LookupCache}. */
    LookupCache getCache();
}

FullCachingLookupProvider

This interface is for supporting full cache strategy. It reuses ScanRuntimeProvider and defines reload time. 

Code Block
languagejava
titleFullCachingLookupProvider
/**
 * A {@link CachingLookupProvider} that never lookup in external system on cache miss and provides a
 * cache for holding all entries in the external system. The cache will be fully reloaded from the
 * external system by the {@link ScanTableSource.ScanRuntimeProvider} and reload operations will be
 * triggered by the {@link CacheReloadTrigger}.
 */
@PublicEvolving
public interface FullCachingLookupProvider extends LookupFunctionProvider {
    static FullCachingLookupProvider of(
            ScanTableSource.ScanRuntimeProvider scanRuntimeProvider,
            CacheReloadTrigger cacheReloadTrigger) {
        return new FullCachingLookupProvider() {
            @Override
            public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider() {
                return scanRuntimeProvider;
            }

            @Override
            public CacheReloadTrigger getCacheReloadTrigger() {
                return cacheReloadTrigger;
            }

            @Override
            public LookupFunction createLookupFunction() {
                return keyRow -> null;
            }
        };
    }

    /**
     * Get a {@link ScanTableSource.ScanRuntimeProvider} for scanning all entries from the external
     * lookup table and load into the cache.
     */
    ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();

    /** Get a {@link CacheReloadTrigger} for triggering the reload operation. */
    CacheReloadTrigger getCacheReloadTrigger();
}

CacheReloadTrigger

A trigger defining custom logic for triggering full cache reloading.

Code Block
languagejava
titleCacheReloadTrigger
/** Customized trigger for reloading all lookup table entries in full caching mode. */
@PublicEvolving
public interface CachingReloadTrigger extends AutoCloseable, Serializable {

    /** Open the trigger. */
    void open(Context context) throws Exception;

    /**
     * Context of {@link CacheReloadTrigger} for getting information about times and
     * triggering reload.
     */
    interface Context {

        /** Get current processing time. */
        long currentProcessingTime();

        /** Get current watermark on the main stream. */
        long currentWatermark();

        /** Trigger a reload operation on the full cache. */
        CompletableFuture<Void> triggerReload();
    }
}

PeriodicCacheReloadTrigger

An implementation of FullCachingReloadTrigger that triggers reload with a specified interval.

Code Block
languagejava
titlePeriodicCacheReloadTrigger
/** A trigger that reloads all entries periodically with specified interval or delay. */
public class PeriodicCacheReloadTrigger implements CacheReloadTrigger {

    private final Duration reloadInterval;
    private final ScheduleMode scheduleMode;

    public PeriodicCacheReloadTrigger(Duration reloadInterval, ScheduleMode scheduleMode) {
        this.reloadInterval = reloadInterval;
        this.scheduleMode = scheduleMode;
    }

    @Override
    public void open(CacheReloadTrigger.Context context) {
        // Register periodic reload task
    }

    @Override
    public void close() throws Exception {
        // Dispose resources
    }

    public enum ScheduleMode {
        FIXED_DELAY,
        FIXED_RATE
    }
}

TimedCacheReloadTrigger

Code Block
languagejava
titleTimedCacheReloadTrigger
/** A trigger that reloads at a specific local time and repeat for the given interval in days. */ 
public class TimedCacheReloadTrigger implements CacheReloadTrigger {

    private final LocalTime reloadTime;
    private final int reloadIntervalInDays;

    public TimedCacheReloadTrigger(LocalTime reloadTime, int reloadIntervalInDays) {
        this.reloadTime = reloadTime;
        this.reloadIntervalInDays = reloadIntervalInDays;
    }

    @Override
    public void open(Context context) {
		// Register periodic reload task
    }

    @Override
    public void close() throws Exception {
		// Dispose resources
    }
}

TableFunctionProvider / AsyncTableFunctionProvider

We'd like to deprecate these two interfaces and let developers to switch to the new LookupFunctionProvider / AsyncLookupFunctionProvider / FullCachingLookupProvider instead.

Table Options for Lookup Cache

In order to unify the usage of caching across all connectors, we'd like to introduce some common table options, which are defined under class LookupOptions. Note that these options are not required to implement by all connectors. 

OptionTypeDescriptions
lookup.cacheEnum of NONE, PARTIAL and FULL

The caching strategy for this lookup table.

NONE: Do not use cache

Partial: Use partial caching mode

FULL: Use full caching mode

lookup.max-retriesIntegerThe maximum allowed retries if a lookup operation fails
lookup.partial-cache.expire-after-accessDurationDuration to expire an entry in the cache after accessing
lookup.partial-cache.expire-after-writeDurationDuration to expire an entry in the cache after writing
lookup.partial-cache.cache-missing-keyBooleanWhether to store an empty value into the cache if the lookup key doesn't match any rows in the table
lookup.partial-cache.max-rowsLongThe maximum number of rows to store in the cache
lookup.full-cache.reload-strategyEnum of PERIODIC and TIMED

The reload strategy for the full cache scenario.

PERIODIC: Use PeriodicCacheReloadTrigger

TIMED: Use TimedCacheReloadTrigger

lookup.full-cache.periodic-reload.intervalDurationDuration to trigger reload in the PeriodicCacheReloadTrigger
lookup.full-cache.periodic-reload.schedule-modeEnum of FIXED_DELAY and FIXED_RATEThe periodically schedule mode of reloading in the PeriodicCacheReloadTrigger
lookup.full-cache.timed-reload.iso-timeString

Time in ISO-8601 format when cache needs to be reloaded. Time can be specified either with timezone or without timezone (target JVM local timezone will be used). See formatter ISO_TIME.

lookup.full-cache.timed-reload.interval-in-daysInteger

The interval in days to trigger the reload at the specified time

Cache Metrics

It is important to mention that a cache implementation does not have to report all the defined metrics. But if a cache reports a metric of the same semantic defined below, the implementation should follow the convention.

NameTypeUnitDescription
numCachedRecordGaugeRecordsThe number of records in cache.
numCachedBytesGaugeBytesThe number of bytes used by cache.
hitCountCounter
The number of cache hits
missCountCounter
The number of cache misses, which might leads to loading operations
loadCountCounter

The number of times to load data into cache from external system.

For partial cache the load count should be equal to miss count, but for all cache this would be different.

numLoadFailureCounter
The number of load failures
latestLoadTimeGaugemsThe time spent for the latest load operation

Here we just define fundamental metrics and let the external metric system make the aggregation to get more descriptive values such as hitRate = hitCount / (hitCount + missCount).

Scope

The metric group for the cache would be a sub-group of the OperatorMetricGroup where the table function belongs to.

Future Works

In order to reduce network I/O with external systems and the usage of cache further, some optimizations implemented on scan source could be also applied on the lookup table, such as projection and filter pushdown. These features will be introduced separately in another FLIP. 

...

Compatibility, Deprecation, and Migration Plan

Currently we have JDBC, Hive and HBase connector implemented lookup table source. All existing implementations will be migrated to the current design and the migration will be transparent to end users. Table options related to caching defined by these connectors will be migrated to new table options defined in this FLIP above. 

Test Plan

We will use unit and integration test for validating the functionality of cache implementations.

Rejected Alternatives

...

Add cache in TableFunction implementations

Compared with this design, adding cache in TableFunction implementations might lead to inconsistency between sync and async table function, and not suitable for applying optimizations. None