Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

JIRA: TBD

...

Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-28415

Release1.16


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

This FLIP is a joint work of initially proposed by Yuan Zhu (zstraw@163.com), and finished by Qingsheng Ren (partial caching part) and Alexander Smirnov (full caching part).


Table of Contents

Motivation

As a widely-used feature in Flink SQL jobs, the performance of lookup table source is essential not only for users but also source developers for tuning their implementations. Most lookup table sources use cache to achieve better performance, but there are some features missing in the current design of cache:

...

In order to address the issues above, we propose here to define a unified abstraction for lookup source cache and its related metrics.

Proposed Changes

We'd like to split the proposal into two kinds of caching strategies: LRU cache and all cache.

LRU cache

Top-level APIs

In order to clarify the semantic of lookup, we'd like to introduce some top-level APIs for general lookup operations without cachingLRU is the most common caching strategy, which dynamically evicts entries in the cache according to the given configuration. For supporting LRU cache in lookup table, we propose several new interfaces to simplify the work for developers to implement lookup table functions and enable cache:

  • LookupFunction / AsyncLookupFunction, an extended version of TableFunction to

...

  • make the API more straight forward.

...

  • LookupFunctionProvider /

...

  • AsyncLookupProvider, serve as the creator of LookupFunction / AsyncLookupFunction in table source

And APIs related to the cache:

  • LookupCache, defining the cache

...

  • used in lookup table.

...

  • DefaultLookupCache a default implementation of a

...

  • cache that suitable for most use cases.

...

  • CacheMetricGroup, defining metrics should be reported by the

...

  • cache.

...

Partial and Full Caching

More specifically,  we'd like to provide public interfaces for the most 2 common cases to lookup source developers, which are named as partial and full caching.

Partial caching

Partial caching loads data into the cache along with the access to the external system. If the key to lookup does not exist in the cache, a lookup action to the external system will be triggered and the lookup result will be stored in the cache for further loading. Users and lookup table developers are able to config the eviction policy and maximum size of the cache.

In order to support partial caching, we propose to introduce 2 new interfaces:

  • PartialCachingLookupProvider / AsyncPartialCachingLookupProvider, as the API interacting with table source to get LookupFunction and LookupCache.

The cache serves as a component in LookupJoinRunner, and would be pluggable by specifying in the constructor of the provider. The planner will take over the lookup function and the cache created from the provider and pass it to the LookupJoinRunner. The The LRU cache serves as a component in LookupJoinRunner, and would be pluggable by specifying LookupCacheFactory in LookupFunctionProvider. The developer of a lookup table needs to define a LookupFunctionProvider / AsyncLookupProvider in their implementation of LookupTableSource to specify the LookupFunction and the factory of the cache, then the planner will take over the cache factory, pass it to the LookupJoinRunner, and the cache will be instantiated during the runtime execution .

All Cache

and loading operations via lookup function if there's a cache miss.

Full Caching

If If the size of lookup table is relatively small to fit into the memory, and the lookup table doesn't change frequently, it'll be more efficient to load all entries of the lookup table into the cache to reduce network I/O, and refresh the table periodically.  We'd like to name this use case as "all full cache". Logically the reload operation is a kind of scan, so we'd like to reuse the ScanRuntimeProvider so that developers could reuse the scanning logic implemented in Source / SourceFunction / InputFormat. Considering the complexity of Source API, we'd like to support SourceFunction and InputFormat API first. Supporting Source API might require new topology and will be discussed later in another FLIP.

We propose to introduce a new interface RescanRuntimeProvider in order to reuse several new interfaces:

  • FullCachingLookupProvider, for reusing the ability of scanning.

Public Interfaces

Lookup Functions

  • CacheReloadTrigger, for customizing reloading strategies of all entries in the full cache. 

Also As the usage of TableFunction interface is not quite straight forward to lookup table developers, we'd like to provide two default implementations of CacheReloadTrigger:

  • PeriodicCacheReloadTrigger, for triggering reload periodically with a specific interval
  • TimedCacheReloadTrigger, for triggering reload at the specific time and repeat with the interval in days.

Public Interfaces

Lookup Functions

As the usage of TableFunction interface is not quite straight forward to lookup table developers, we'd like to introduce a new interface for introduce a new interface for sync and async lookup tables. Caching will be only supported on LookupFunction / AsyncLookupFunction.

...

Code Block
languagejava
titleAsyncLookupFunction
/**
 * A wrapper class of {@link AsyncTableFunction} for asynchronously lookup rows matching the lookup
 * keys from external system.
 *
 * <p>The output type of this table function is fixed as {@link RowData}.
 */
@PublicEvolving
public abstract class AsyncLookupFunction extends AsyncTableFunction<RowData> {

    /**
     * Asynchronously lookup rows matching the lookup keys.
     *
     * @param keyRow - A {@link RowData} that wraps keys to lookup.
     * @return A collections of all matching rows in the lookup table.
     */
    public abstract CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow);

    /** Invokes {@link #asyncLookup} and chains futures. */
    public final void eval(CompletableFuture<Collection<RowData>> future, Object... keys) {
         asyncLookup(GenericRowData.of(keys))
                .whenCompleteAsync(
                        (result, exception) -> {
                            if (exception != null) {
                                future.completeExceptionally(exception);
                                return;
                            }
                            future.complete(result);
                        });
    }
}

LookupCache

Considering there might be custom caching strategies and optimizations, we'd like to expose the cache interface as public API for developers to make the cache pluggable.

LookupFunctionProvider

Code Block
languagejava
titleLookupFunctionProvider
/** A provider for creating {@link LookupFunction}. */
@PublicEvolving
public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {

    static LookupFunctionProvider of(LookupFunction lookupFunction) {
        return () -> lookupFunction;
    }

    LookupFunction createLookupFunction();
}

AsyncLookupFunctionProvider

Code Block
languagejava
titleAsyncLookupFunctionProvider
/** A provider for creating {@link AsyncLookupFunction}. */
@PublicEvolving
public interface AsyncLookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {

    static AsyncLookupFunctionProvider of(AsyncLookupFunction asyncLookupFunction) {
        return () -> asyncLookupFunction;
    }

    AsyncLookupFunction createAsyncLookupFunction();
}

LookupCache

Considering there might be custom caching strategies and optimizations, we'd like to expose the cache interface as public API for developers to make the cache pluggable.

Code Block
languagejava
titleLookupCache
/**
 * A semi-persistent mapping from keys to values for storing entries of lookup table.
 *
 * <p>The type of the caching key is a {@link RowData} with lookup key fields packed inside. The
 * type of value is a {@link Collection} of {@link RowData}, which are rows matching lookup key
 * fields.
 *
 * <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either
 * evicted or manually invalidated.
 *
 * <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
 * by multiple concurrent threads.
 */
@PublicEvolving
public interface LookupCache extends AutoClosable, Serializable {

    /**
Code Block
languagejava
titleLookupCache
/**
 * A semi-persistent mapping from keys to values for storing entries of lookup table.
 *
 * <p>The type of the caching key is a {@link RowData} with lookup key fields packed inside. The
 * type of value is a {@link Collection} of {@link RowData}, which are rows matching lookup key
 * fields.
 *
 * <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either
 * evicted or manually invalidated.
 *
 * <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
 * by multiple concurrent threads.
 */
@PublicEvolving
public interface LookupCache {

    /**
     * Returns the value associated with key in this cache, or null if there is no cached value for
     * key.
     */
    @Nullable
    Collection<RowData> getIfPresent(RowData key);

    /**
     * Associates the specified value rows with the specified key row in the cache. If the cache
     * previously contained value associated with the key, the old value is replaced by the
     * specifiedInitialize the valuecache.
        *
     * @param @returnmetricGroup the previousmetric valuegroup rowsto associatedregister withcache key, or null if there was no mapping for key.
     * @param key - key row with which the specified value is to be associated
     * @param value – value rows to be associated with the specified keyrelated metrics.
     */
    void open(CacheMetricGroup metricGroup);

    /**
     * Returns the value associated with key in this cache, or null if there is no cached value for
     * key.
     */
    @Nullable
    Collection<RowData> putgetIfPresent(RowData key, Collection<RowData> value);

    /**
 Discards  any cached value* forAssociates the specified key. */
    void invalidate(RowData key);

value rows with the specified key row in the cache. If the cache
     /** Returns* previously contained value associated with the number of key-value mappings in the cache. */key, the old value is replaced by the
     * specified value.
      *
    long size();
}

LookupCacheFactory

As the cache should be instantiated during runtime execution to avoid serialization / deserialization, a factory is required for creating the cache. 

Code Block
languagejava
titleLookupCacheFactory

/** Factory for creating an instance of {@link LookupCache}. */
@PublicEvolving
public interface LookupCacheFactory extends Serializable {

    /**
     * Create a {@link LookupCache}. * @return the previous value rows associated with key, or null if there was no mapping for key.
     * @param key - key row with which the specified value is to be associated
     *
 @param value  value * @param metricGroup - The lookup cache metric group in which the cache register predefined and
     *     custom metrics.
    rows to be associated with the specified key
     */
    Collection<RowData> put(RowData key, Collection<RowData> value);

    /** Discards any cached value for the specified key. */
    LookupCachevoid createCacheinvalidate(LookupCacheMetricGroup metricGroupRowData key);

    /** Returns the number of key-value mappings in the cache. */
    long size();
}

DefaultLookupCacheFactory

DefaultLookupCache

As the cache should be instantiated during runtime execution to avoid serialization / deserialization, a factory is required for creating the In order to simplify the usage of developer, we provide a default factory for building a default cache. 

Code Block
languagejava
titleDefaultLookupCacheFactoryDefaultLookupCache
/** FactoryDefault forimplementation creating instance of {of {@link DefaultLookupCacheLookupCache}. */
@PublicEvolving
public class DefaultLookupCacheFactoryDefaultLookupCache implements LookupCacheFactoryLookupCache {
     private private final Duration expireAfterAccessDuration;
    private final Duration expireAfterWriteDuration;
    private final IntegerLong initialCapacitymaximumSize;
    private final Longboolean maximumSizecacheMissingKey;

    public static DefaultLookupCacheFactory.Builder newBuilder() {
    private  DefaultLookupCache(
  return new Builder();
    }

    privateDuration DefaultLookupCacheFactory(expireAfterAccessDuration,
            Duration expireAfterAccessDurationexpireAfterWriteDuration,
            DurationLong expireAfterWriteDurationmaximumSize,
			boolean cacheMissingKey) {
        this.expireAfterAccessDuration  Integer initialCapacity,= expireAfterAccessDuration;
        this.expireAfterWriteDuration    Long maximumSize) {
 = expireAfterWriteDuration;
       // Validation
        this.expireAfterAccessDurationinitialCapacity = expireAfterAccessDurationinitialCapacity;
        this.expireAfterWriteDurationmaximumSize = expireAfterWriteDurationmaximumSize;
        		this.initialCapacitycacheMissingKey = initialCapacitycacheMissingKey;
    }
    this.maximumSize
 =  maximumSize;
 public static Builder newBuilder() }{

    @Override
    publicreturn LookupCachenew createCacheBuilder(LookupCacheMetricGroup metricGroup) {;
    } 

   public // Create instance of DefaultLookupCache
    }

    /** Builder of {@link DefaultLookupCacheFactory}. */
    public static class Builder {static class Builder {         
        private Duration expireAfterAccessDuration;
        private Duration expireAfterWriteDuration;
        private IntegerLong initialCapacitymaximumSize;
        private LongBoolean maximumSizecacheMissingKey;

        public DefaultLookupCacheFactory.Builder expireAfterAccess(Duration duration) {
            expireAfterAccessDuration = duration;
            return this;
        }

        public DefaultLookupCacheFactory.Builder expireAfterWrite(Duration duration) {
            expireAfterWriteDuration = duration;
            return this;
        }

        public DefaultLookupCacheFactory.Builder initialCapacitymaximumSize(intlong initialCapacitymaximumSize) {
            this.initialCapacitymaximumSize = initialCapacitymaximumSize;
            return this;
        }

        public DefaultLookupCacheFactory.Builder maximumSizecacheMissingKey(longboolean maximumSizecacheMissingKey) {
            this.maximumSizecacheMissingKey = maximumSizecacheMissingKey;
            return this;
        }

           

        public DefaultLookupCacheFactoryDefaultLookupCache build() {
            return new DefaultLookupCacheFactoryDefaultLookupCache(
                    expireAfterAccessDuration,
                    expireAfterWriteDuration,
                    initialCapacity,
                    maximumSizemaximumSize,
					cacheMissingKey);
        }
    }     
}

...

CacheMetricGroup

An interface defining all cache related metric:

Code Block
languagejava
titleCacheMetricGroup
/**
 * Pre-defined metrics for {@code LookupCache}cache.
 *
 * <p>Please note that these methods should only be invoked once. Registering a metric with same
 * name for multiple times would lead to an undefined behavior.
 */
@PublicEvolving
public interface LookupCacheMetricGroupCacheMetricGroup extends MetricGroup {
    /** The number of cache hits. */
    void hitCounter(Counter hitCounter);

    /** The number of cache misses. */
    void missCounter(Counter missCounter);

    /** The number of times to load data into cache from external system. */
    void loadCounter(Counter loadCounter);

    /** The number of load failures. */
    void numLoadFailuresCounter(Counter numLoadFailuresCounter);

    /** The time spent for the latest load operation. */
    void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);

    /** The number of records in cache. */
    void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);

    /** The number of bytes used by cache. */
    void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
}

...

PartialCachingLookupProvider

This is the API between table framework and user's table source. Implementation should define how to create a lookup function and whether to use cache. 

Code Block
languagejava
titleLookupFunctionProviderPartialCachingLookupProvider
/**
 * Provider for creating {@link LookupFunction} and {@link LookupCacheFactoryLookupCache} if caching should be
 * enabled for thestoring lookup tableentries.
 */
@PublicEvolving
public interface LookupFunctionProviderPartialCachingLookupProvider extends LookupTableSource.LookupRuntimeProviderLookupFunctionProvider {

    /**
    /*    * CreatesBuild a builder of{@link PartialCachingLookupProvider} from the specified {@link LookupFunction} and
     * {@link LookupFunctionProviderLookupCache}.
     */
    static BuilderPartialCachingLookupProvider newBuilder(of(LookupFunction lookupFunction, LookupCache cache) {
        return new BuilderPartialCachingLookupProvider(); {

    }

    /** Creates an {@link LookupFunction}@Override
 instance. */
           LookupFunctionpublic LookupCache createLookupFunctiongetCache(); {

    /**
     * Gets the {@link LookupCacheFactory} for creating lookupreturn cache.;
     *
       *}

 <p>This factory will be used for creating an instance of cache during@Override
 runtime execution for
     * optimizing the access topublic externalLookupFunction lookup table.createLookupFunction() {
     *
     * @return an {@link Optional} of {@link LookupCacheFactory}, or an empty {@link Optional} if
 return lookupFunction;
            }
      *  };
   caching shouldn't be applies to the lookup table.
     }

    /** Get a new instance of {@link LookupCache}. */
    Optional<LookupCacheFactory>LookupCache getCacheFactorygetCache();
}

PartialCachingAsyncLookupProvider

Code Block
languagejava
titlePartialCachingAsyncLookupProvider

    /**
 * Provider for creating *{@link WhetherAsyncLookupFunction} theand missing{@link keyLookupCache} (keyfor fieldsstoring withoutlookup any matching value rows) should be stored in theentries.
 */
@PublicEvolving
public interface PartialCachingAsyncLookupProvider extends AsyncLookupFunctionProvider {

     /** cache.
     *
 Build a {@link PartialCachingLookupProvider} from * <p>Please note that this option is required ifthe specified {@link AsyncLookupFunction} and
     * {@link #getCacheFactory()LookupCache}.
 returns   a non-empty*/
    static * instance. If the cache factory is empty, the return value of this function will be ignored.
PartialCachingLookupProvider of(AsyncLookupFunction asyncLookupFunction, LookupCache cache) {
        return new PartialCachingAsyncLookupProvider() {

      *
      @Override
 * @return true if a null or empty value should be storedpublic inLookupCache the cache.getCache() {
     */
    Optional<Boolean> cacheMissingKey();

    /** Builder class for {@link LookupFunctionProvider}. */return cache;
    class Builder {

      }

  private LookupFunction lookupFunction;
        private LookupCacheFactory cacheFactory;
 @Override
       private Boolean enableCacheMissingKey;

    public AsyncLookupFunction createAsyncLookupFunction() {
 /** Sets lookup function. */
        public Builder withLookupFunction(LookupFunction lookupFunction)return {asyncLookupFunction;
            this.lookupFunction = lookupFunction;}
        };
    return this;
        }

        /** EnablesGet cachinga andnew setsinstance theof cache{@link factoryLookupCache}. */
    LookupCache getCache();
}

FullCachingLookupProvider

This interface is for supporting full cache strategy. It reuses ScanRuntimeProvider and defines reload time. 

Code Block
languagejava
titleFullCachingLookupProvider
/**
 * A public{@link Builder withCacheFactory(LookupCacheFactory cacheFactory) {
            this.cacheFactory = cacheFactory;
            return this;
        }

        /**
         * Enables storing missing key in the cache.
         *
CachingLookupProvider} that never lookup in external system on cache miss and provides a
 * cache for holding all entries in the external system. The cache will be fully reloaded from the
 * external system by the {@link ScanTableSource.ScanRuntimeProvider} and reload operations will be
 * triggered by the {@link CacheReloadTrigger}.
 */
@PublicEvolving
public interface FullCachingLookupProvider extends LookupFunctionProvider {
    static FullCachingLookupProvider of(
            * <p>See {@link LookupFunctionProvider#cacheMissingKey()} for more details.
ScanTableSource.ScanRuntimeProvider scanRuntimeProvider,
            CacheReloadTrigger cacheReloadTrigger) */{
        publicreturn Buildernew enableCacheMissingKeyFullCachingLookupProvider() {
            this.enableCacheMissingKey@Override
 = true;
          public  return this;ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider() {
        }

        return publicscanRuntimeProvider;
 LookupFunctionProvider build() {
         }

   //  Build LookupFunctionProvider
      @Override
  }
    }
}

AsyncLookupFunctionProvider

Code Block
languagejava
titleAsyncLookupFunctionProvider
@PublicEvolving
public interface AsyncLookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {

 public CacheReloadTrigger  /**
getCacheReloadTrigger() {
      * Creates {@link AsyncLookupFunctionProvider} with the given {@link AsyncLookupFunction} and
 return cacheReloadTrigger;
   * disable lookup table caching.
     */}

    static AsyncLookupFunctionProvider of(AsyncLookupFunction asyncLookupFunction) {
        return new AsyncLookupFunctionProvider() {
            @Override
            public AsyncLookupFunctionLookupFunction createAsyncLookupFunctioncreateLookupFunction() {
                return keyRow -> asyncLookupFunctionnull;
            }

        };
    @Override}

    /**
     * Get a public Optional<LookupCacheFactory> getCacheFactory() {
     {@link ScanTableSource.ScanRuntimeProvider} for scanning all entries from the external
     * lookup table and load into returnthe Optionalcache.empty();
     */
    ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();

  }

  /** Get a {@link CacheReloadTrigger} for triggering the reload operation. @Override*/
    CacheReloadTrigger getCacheReloadTrigger();
}

CacheReloadTrigger

A trigger defining custom logic for triggering full cache reloading.

Code Block
languagejava
titleCacheReloadTrigger
/** Customized trigger for reloading all lookup publictable Optional<Boolean>entries cacheMissingKey() {
         in full caching mode. */
@PublicEvolving
public interface CachingReloadTrigger extends AutoCloseable, Serializable {

    /** Open the return Optional.empty();
            }trigger. */
    void open(Context context) throws }Exception;
    }

    /**
     * Context Createsof {@link AsyncLookupFunctionProviderCacheReloadTrigger} withfor thegetting giveninformation {@linkabout AsyncLookupFunction}times and
     * enable caching with specified {@link LookupCacheFactory}triggering reload.
     */
    staticinterface AsyncLookupFunctionProviderContext of({

        /** Get current processing AsyncLookupFunction asyncLookupFunction,time. */
          long currentProcessingTime();

  LookupCacheFactory cacheBuilder,
     /** Get current watermark on the main boolean cacheMissingKey) {stream. */
        returnlong new AsyncLookupFunctionProvidercurrentWatermark() {;

        /** Trigger a reload @Override
operation on the full cache. */
       public AsyncLookupFunctionCompletableFuture<Void> createAsyncLookupFunctiontriggerReload() {;
    }
}

PeriodicCacheReloadTrigger

An implementation of FullCachingReloadTrigger that triggers reload with a specified interval.

Code Block
languagejava
titlePeriodicCacheReloadTrigger
/** A trigger that reloads all entries periodically with specified interval or return asyncLookupFunction;
     delay. */
public class PeriodicCacheReloadTrigger implements CacheReloadTrigger {

    private final Duration }reloadInterval;

    private final ScheduleMode  scheduleMode;

    @Override
public PeriodicCacheReloadTrigger(Duration reloadInterval, ScheduleMode scheduleMode) {
       public Optional<LookupCacheFactory> getCacheFactory() {this.reloadInterval = reloadInterval;
        this.scheduleMode = scheduleMode;
      return Optional.of(cacheBuilder);}

    @Override
    public void open(CacheReloadTrigger.Context context) }{

        // Register periodic reload @Overridetask
    }

    @Override
    public Optional<Boolean>void cacheMissingKeyclose() throws Exception {
        // Dispose resources
    }

  return Optional.of(cacheMissingKey);
     public enum ScheduleMode {
        }FIXED_DELAY,
        };FIXED_RATE
    }

}

TimedCacheReloadTrigger

Code Block
languagejava
titleTimedCacheReloadTrigger
    /** CreatesA antrigger {@linkthat AsyncLookupFunction}reloads instance. */
    AsyncLookupFunction createAsyncLookupFunction();

    /**
     * Gets the {@link LookupCacheFactory} for creating lookup cache.
     *at a specific local time and repeat for the given interval in days. */ 
public class TimedCacheReloadTrigger implements CacheReloadTrigger {

    private final LocalTime reloadTime;
    private *final <p>This factory will be used for creating an instance of cache during runtime execution for
int reloadIntervalInDays;

    public TimedCacheReloadTrigger(LocalTime reloadTime, int reloadIntervalInDays) {
        this.reloadTime *= optimizingreloadTime;
 the access to external lookup table.
  this.reloadIntervalInDays =  *reloadIntervalInDays;
    }

 * @return an {@link@Override
 Optional} of {@link LookupCacheFactory},public orvoid an empty {@link Optional} ifopen(Context context) {
		// Register periodic reload task
    }

 *   @Override
  caching shouldn't bepublic appliesvoid toclose() thethrows lookupException table.
     */{
		// Dispose resources
    Optional<LookupCacheFactory> getCacheFactory();

    /**
     * Whether the missing key (key fields without any matching value rows) should be stored in the
     * cache.
     *
     * <p>Please note that this option is required if {@link #getCacheFactory()} returns a non-empty
     * instance. If the cache factory is empty, the return value of this function will be ignored.
     *
     * @return true if a null or empty value should be stored in the cache.
     */
    Optional<Boolean> cacheMissingKey();
    
    /** Builder class for {@link AsyncLookupFunctionProvider}. */
    class Builder {

        private AsyncLookupFunction asyncLookupFunction;
        private LookupCacheFactory cacheFactory;
        private Boolean enableCacheMissingKey;

        /** Sets lookup function. */
        public Builder withAsyncLookupFunction(AsyncLookupFunction asyncLookupFunction) {
            this.asyncLookupFunction = asyncLookupFunction;
            return this;
        }

        /** Enables caching and sets the cache factory. */
        public Builder withCacheFactory(LookupCacheFactory cacheFactory) {
            this.cacheFactory = cacheFactory;
            return this;
        }

        /**
         * Enables storing missing key in the cache.
         *
         * <p>See {@link AsyncLookupFunctionProvider#cacheMissingKey()} for more details.
         */
        public Builder enableCacheMissingKey() {
            this.enableCacheMissingKey = true;
            return this;
        }

        public AsyncLookupFunctionProvider build() {
            // Build AsyncLookupFunctionProvider
        }
    }
}

RescanRuntimeProvider

This interface is for supporting all cache strategy. It reuses ScanRuntimeProvider and defines interval of re-scan. 

Code Block
languagejava
titleRescanRuntimeProvider
/**
 * Runtime provider for periodically re-scanning all entries of the lookup table and storing the
 * table locally for lookup.
 */
@PublicEvolving
public interface RescanRuntimeProvider extends LookupTableSource.LookupRuntimeProvider {

    /** Creates builder of {@link RescanRuntimeProvider}. */
    static Builder newBuilder() {
        return new Builder();
    }

    /**
     * Gets the {@link org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider}
     * for executing the periodically re-scan.
     */
    ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();

    /** Gets the interval between two re-scans. */
    Duration getRescanInterval();

    /** Builder class for {@link RescanRuntimeProvider}. */
    class Builder {
        private ScanTableSource.ScanRuntimeProvider scanRuntimeProvider;
        private Duration rescanInterval;

        /** Sets scan runtime provider. */
        public Builder withScanRuntimeProvider(
                ScanTableSource.ScanRuntimeProvider scanRuntimeProvider) {
            this.scanRuntimeProvider = scanRuntimeProvider;
            return this;
        }

        /** Sets rescan interval. */
        public Builder withRescanInterval(Duration rescanInterval) {
            this.rescanInterval = rescanInterval;
            return this;
        }

        /** Build {@link RescanRuntimeProvider}. */
        public RescanRuntimeProvider build() {
            // Build RescanRuntimeProvider
        }
    }
}

TableFunctionProvider / AsyncTableFunctionProvider

...

}
}

TableFunctionProvider / AsyncTableFunctionProvider

We'd like to deprecate these two interfaces and let developers to switch to the new LookupFunctionProvider / AsyncLookupFunctionProvider / FullCachingLookupProvider instead.

Table Options for Lookup Cache

In order to unify the usage of caching across all connectors, we'd like to introduce some common table options, which are defined under class LookupOptions. Note that these options are not required to implement by all connectors. 

OptionTypeDescriptions
lookup.cacheEnum of NONE, PARTIAL and FULL

The caching strategy for this lookup table.

NONE: Do not use cache

Partial: Use partial caching mode

FULL: Use full caching mode

lookup.max-retriesIntegerThe maximum allowed retries if a lookup operation fails
lookup.partial-cache.expire-after-accessDurationDuration to expire an entry in the cache after accessing
lookup.partial-cache.expire-after-writeDurationDuration to expire an entry in the cache after writing
lookup.partial-cache.cache-missing-keyBooleanWhether to store an empty value into the cache if the lookup key doesn't match any rows in the table
lookup.partial-cache.max-rowsLongThe maximum number of rows to store in the cache
lookup.full-cache.reload-strategyEnum of PERIODIC and TIMED

The reload strategy for the full cache scenario.

PERIODIC: Use PeriodicCacheReloadTrigger

TIMED: Use TimedCacheReloadTrigger

lookup.full-cache.periodic-reload.intervalDurationDuration to trigger reload in the PeriodicCacheReloadTrigger
lookup.full-cache.periodic-reload.schedule-modeEnum of FIXED_DELAY and FIXED_RATEThe periodically schedule mode of reloading in the PeriodicCacheReloadTrigger
lookup.full-cache.timed-reload.iso-timeString

Time in ISO-8601 format when cache needs to be reloaded. Time can be specified either with timezone or without timezone (target JVM local timezone will be used). See formatter ISO_TIME.

lookup.full-cache.timed-reload.interval-in-daysInteger

The interval in days to trigger the reload at the specified time

Cache Metrics

It is important to mention that a cache implementation does not have to report all the defined metrics. But if a cache reports a metric of the same semantic defined below, the implementation should follow the convention.

NameTypeUnitDescription
numCachedRecordGaugeRecordsThe number of records in cache.
numCachedBytesGaugeBytesThe number of bytes used by cache.
hitCountCounter
The number of cache hits
missCountCounter
The number of cache misses, which might leads to loading operations
loadCountCounter

The number of times to load data into cache from external system.

For

LRU

partial cache the load count should be equal to miss count, but for all cache this would be different.

numLoadFailureCounter
The number of load failures
latestLoadTimeGaugemsThe time spent for the latest load operation

Here we just define fundamental metrics and let the external metric system make the aggregation to get more descriptive values such as hitRate = hitCount / (hitCount + missCount).

Scope

The metric group for the cache would be a sub-group of the OperatorMetricGroup where the table function belongs to.

Future Works

In order to reduce network I/O with external systems and the usage of cache further, some optimizations implemented on scan source could be also applied on the lookup table, such as projection and filter pushdown. These features will be introduced separately in another FLIP. 

Compatibility, Deprecation, and Migration Plan

Currently we have JDBC, Hive and HBase connector implemented lookup table source. All existing implementations will be migrated to the current design and the migration will be transparent to end users. Table options related to caching defined by these connectors will be migrated to new table options defined in this FLIP above

Test Plan

We will use unit and integration test for validating the functionality of cache implementations.

Rejected Alternatives

Add cache in TableFunction implementations

Compared with this design, adding cache in TableFunction implementations might lead to inconsistency between sync and async table function, and not suitable for applying optimizations. 

...