Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

JIRA: TBD

...

Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-28415

Release1.16


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

This FLIP is a joint work of initially proposed by Yuan Zhu (zstraw@163.com), and finished by Qingsheng Ren (partial caching part) and Alexander Smirnov (full caching part).


Table of Contents

Motivation

...

In order to address the issues above, we propose here to define a unified abstraction for lookup source cache and its related metrics.

Proposed Changes

We'd like to split the proposal into two kinds of caching mode: partial caching and full caching.

Partial caching

Partial caching loads data into the cache along with the access to the external system. If the key to lookup does not exist in the cache, a lookup action to the external system will be triggered and the lookup result will be stored in the cache for further loading. Users and lookup table developers are able to config the eviction policy and maximum size of the cache.

In order to support partial caching, we propose to introduce several new interfaces to simplify the work for developers to implement lookup table functions and enable cache:

Top-level APIs

In order to clarify the semantic of lookup, we'd like to introduce some top-level APIs for general lookup operations without caching:

  • LookupFunction / AsyncLookupFunction, an extended version of TableFunction to make the API more straight forward.
  • LookupFunctionProvider / AsyncLookupProvider, serve as the creator of LookupFunction / AsyncLookupFunction in table source

And APIs related to the cache:

  • LookupCache, defining the cache used in lookup table.
  • DefaultLookupCache a default implementation of a cache that suitable for most use cases.
  • CacheMetricGroup, defining metrics should be reported by the cache.

Partial and Full Caching

More specifically,  we'd like to provide public interfaces for the most 2 common cases to lookup source developers, which are named as partial and full caching.

Partial caching

Partial caching loads data into the cache along with the access to the external system. If the key to lookup does not exist in the cache, a lookup action to the external system will be triggered and the lookup result will be stored in the cache for further loading. Users and lookup table developers are able to config the eviction policy and maximum size of the cache.

In order to support partial caching, we propose to introduce 2 new interfaces:

  • PartialCachingLookupProvider / AsyncPartialCachingLookupProvider
  • LookupFunction / AsyncLookupFunction, an extended version of TableFunction to clarify the semantic of lookup.
  • LookupCache / LookupCacheFactory, defining the cache and its factory used in lookup table.
  • DefaultLookupCacheFactory, a default implementation of a cache that suitable for most use cases.
  • LookupCacheMetricGroup, defining metrics should be reported by the lookup cache.
  • LookupFunctionProvider / AsyncLookupFunctionProvider, as the API interacting with table source to get LookupFunction and LookupCacheFactoryLookupCache.

The cache serves as a component in LookupJoinRunner, and would be pluggable by specifying LookupCacheFactory in LookupFunctionProvider. The developer of a lookup table needs to define a LookupFunctionProvider / AsyncLookupProvider in their implementation of LookupTableSource to specify the LookupFunction and the factory of the cache, then the in the constructor of the provider. The planner will take over the lookup function and the cache factory, created from the provider and pass it to the LookupJoinRunner, and the . The cache will be instantiated during the runtime execution and loading operations via lookup function if there's a cache miss.

Full Caching

If the size of lookup table is relatively small to fit into the memory, and the lookup table doesn't change frequently, it'll be more efficient to load all entries of the lookup table into the cache to reduce network I/O, and refresh the table periodically.  We'd like to name this use case as "full cache". Logically the reload operation is a kind of scan, so we'd like to reuse the ScanRuntimeProvider so that developers could reuse the scanning logic implemented in Source / SourceFunction / InputFormat. Considering the complexity of Source API, we'd like to support SourceFunction and InputFormat API first. Supporting Source API might require new topology and will be discussed later in another FLIP.

We propose to introduce a several new interface FullCachingLookupProvider in order to reuse interfaces:

  • FullCachingLookupProvider, for reusing the ability of scanning.
  • CacheReloadTrigger, for customizing reloading strategies of all entries in the full cache. 

Also we'd like to provide two default implementations of CacheReloadTrigger:

  • PeriodicCacheReloadTrigger, for triggering reload periodically with a specific interval
  • TimedCacheReloadTrigger, for triggering reload at the specific time and repeat with the interval in days.

Public InterfacesPublic Interfaces

Lookup Functions

As the usage of TableFunction interface is not quite straight forward to lookup table developers, we'd like to introduce a new interface for sync and async lookup tables. Caching will be only supported on LookupFunction / AsyncLookupFunction.

Code Block
languagejava
titleLookupFunction
/**
 * A wrapper class of {@link TableFunction} for synchronously lookup rows matching the lookup keys
 * from external system.
 *
 * <p>The output type of this table function is fixed as {@link RowData}.
 */
@PublicEvolving
public abstract class LookupFunction extends TableFunction<RowData> {

    /**
     * Synchronously lookup rows matching the lookup keys.
     *
     * @param keyRow - A {@link RowData} that wraps keys to lookup.
     * @return A collections of all matching rows in the lookup table.
     */
    public abstract Collection<RowData> lookup(RowData keyRow) throws IOException;

    /** Invoke {@link #lookup} and handle exceptions. */
    public final void eval(Object... keys) {
        try {
            lookup(GenericRowData.of(keys)).forEach(this::collect);
        } catch (IOException e) {
            throw new RuntimeException("Failed to lookup values with given key", e);
        }
    }
}
Code Block
languagejava
titleAsyncLookupFunction
LookupFunction
/**
 * A wrapper class of {@link AsyncTableFunction} for asynchronously lookup rows matching the lookup
 * keys from external system.
 *
 * <p>The output type of this table function is fixed as {@link RowData}.
 */
@PublicEvolving
public abstract class AsyncLookupFunction extends AsyncTableFunction<RowData> {

    /**
 * A wrapper  * Asynchronouslyclass of {@link TableFunction} for synchronously lookup rows matching the lookup keys.
     *
     * @param keyRow - A {@link RowData} that wraps keys to lookup.
     * @return A collections of all matching rows in the lookup table.
     */
    public abstract CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow);

    /** Invokes {@link #asyncLookup} and chains futures. */
    public final void eval(CompletableFuture<Collection<RowData>> future, Object... keys) {
         asyncLookup(GenericRowData.of(keys))from external system.
 *
 * <p>The output type of this table function is fixed as {@link RowData}.
 */
@PublicEvolving
public abstract class LookupFunction extends TableFunction<RowData> {

    /**
     * Synchronously lookup rows matching the lookup keys.
     *
     * @param keyRow - A  .whenCompleteAsync(
     {@link RowData} that wraps keys to lookup.
     * @return A collections of all matching rows in the lookup table.
   (result, exception) -> { */
    public abstract Collection<RowData> lookup(RowData keyRow) throws IOException;

    /** Invoke {@link  #lookup} and handle exceptions. */
    public final ifvoid eval(exception != nullObject... keys) {
        try {
                       future.completeExceptionally(exceptionlookup(GenericRowData.of(keys)).forEach(this::collect);
        } catch (IOException e) {
            throw new RuntimeException("Failed to lookup values with given key", return;
e);
        }
    }
}


Code Block
languagejava
titleAsyncLookupFunction
/**
 * A wrapper class of {@link AsyncTableFunction} for asynchronously lookup rows matching the lookup
 * keys from external system.
 }*
 * <p>The output type of this table function is fixed as {@link RowData}.
 */
@PublicEvolving
public abstract class AsyncLookupFunction extends AsyncTableFunction<RowData> {

    /**
      future.complete(result);
      * Asynchronously lookup rows matching the lookup keys.
     *
     * @param keyRow - A {@link   RowData});
 that   }
}

LookupCache

Considering there might be custom caching strategies and optimizations, we'd like to expose the cache interface as public API for developers to make the cache pluggable.

Code Block
languagejava
titleLookupCache
/**
 * A semi-persistent mapping from keys to values for storing entries ofwraps keys to lookup.
     * @return A collections of all matching rows in the lookup table.
     */
 * <p>The type ofpublic theabstract cachingCompletableFuture<Collection<RowData>> key is a {@link RowData} with lookup key fields packed inside. The
 * type of value is a {@link Collection} of {@link RowData}, which are rows matching lookup key
 * fields.
 *
 * <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either
 * evicted or manually invalidated.
 *
 * <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
 * by multiple concurrent threads.
 */
@PublicEvolving
public interface LookupCache {

    /**
     * Returns the value associated with key in this cache, or null if there is no cached value for
     * key.
     */
    @Nullable
    Collection<RowData> getIfPresent(RowData key);

    /**
     * Associates the specified value rows with the specified key row in the cache. If the cache
     * previously contained value associated with the key, the old value is replaced by the
     * specified value.
      *
     * @return the previous value rows associated with key, or null if there was no mapping for key.
     * @param key - key row with which the specified value is to be associated
     * @param value – value rows to be associated with the specified key
     */
    Collection<RowData> put(RowData key, Collection<RowData> value);

    /** Discards any cached value for the specified key. */
    void invalidate(RowData key);

    /** Returns the number of key-value mappings in the cache. */
    long size();
}

LookupCacheFactory

...

asyncLookup(RowData keyRow);

    /** Invokes {@link #asyncLookup} and chains futures. */
    public final void eval(CompletableFuture<Collection<RowData>> future, Object... keys) {
         asyncLookup(GenericRowData.of(keys))
                .whenCompleteAsync(
                        (result, exception) -> {
                            if (exception != null) {
                                future.completeExceptionally(exception);
                                return;
                            }
                            future.complete(result);
                        });
    }
}

LookupFunctionProvider

Code Block
languagejava
titleLookupFunctionProvider
/** A provider for creating {@link LookupFunction}. */
@PublicEvolving
public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {

    static LookupFunctionProvider of(LookupFunction lookupFunction) {
        return () -> lookupFunction;
    }

    LookupFunction createLookupFunction();
}

AsyncLookupFunctionProvider

Code Block
languagejava
titleLookupCacheFactoryAsyncLookupFunctionProvider

/** FactoryA provider for creating an instance of {@link LookupCacheAsyncLookupFunction}. */
@PublicEvolving
public interface LookupCacheFactoryAsyncLookupFunctionProvider extends SerializableLookupTableSource.LookupRuntimeProvider {

    /**
static AsyncLookupFunctionProvider of(AsyncLookupFunction asyncLookupFunction) {
       * Createreturn a {@link LookupCache}.() -> asyncLookupFunction;
     *}

    AsyncLookupFunction createAsyncLookupFunction();
}

LookupCache

Considering there might be custom caching strategies and optimizations, we'd like to expose the cache interface as public API for developers to make the cache pluggable.

Code Block
languagejava
titleLookupCache
/**
 @param* metricGroupA semi-persistent Themapping lookupfrom cachekeys metricto groupvalues in which the cache register predefined and
    for storing entries of lookup table.
 *
 * <p>The type of the customcaching metrics.
key is a {@link RowData} */
with lookup key fields LookupCache createCache(LookupCacheMetricGroup metricGroup);
}

DefaultLookupCacheFactory

In order to simplify the usage of developer, we provide a default factory for building a default cache. 

Code Block
languagejava
titleDefaultLookupCacheFactory
/** Factory for creating instance ofpacked inside. The
 * type of value is a {@link DefaultLookupCacheCollection}. */
@PublicEvolving
public class DefaultLookupCacheFactory implements LookupCacheFactory {
    private final Duration expireAfterAccessDuration;
    private final Duration expireAfterWriteDuration;
    private final Integer initialCapacity;
    private final Long maximumSize;

    public static DefaultLookupCacheFactory.Builder newBuilder() {
        return new Builder();
    }

    private DefaultLookupCacheFactory(
            Duration expireAfterAccessDuration, of {@link RowData}, which are rows matching lookup key
 * fields.
 *
 * <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either
 * evicted or manually invalidated.
 *
 * <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
 * by multiple concurrent threads.
 */
@PublicEvolving
public interface LookupCache extends AutoClosable, Serializable {

    /**
     * Initialize the cache.
    Duration expireAfterWriteDuration,*
     * @param metricGroup the metric group to register Longcache maximumSize)related {metrics.
     */
   this.expireAfterAccessDuration = expireAfterAccessDuration void open(CacheMetricGroup metricGroup);

    /**
      this.expireAfterWriteDuration = expireAfterWriteDuration;
        this.initialCapacity = initialCapacity;
  * Returns the value associated with key in this cache, or null if there is no cached value for
     * thiskey.maximumSize
 = maximumSize;
    }*/

    @Override@Nullable
    publicCollection<RowData> LookupCache createCachegetIfPresent(LookupCacheMetricGroupRowData metricGroupkey) {
    ;

    // Create instance of DefaultLookupCache**
    }

 * Associates the /**specified Buildervalue ofrows {@link DefaultLookupCacheFactory}. */
    public static class Builder {with the specified key row in the cache. If the cache
     * previously contained privatevalue Durationassociated expireAfterAccessDuration;
with the key, the old value is replaced private Duration expireAfterWriteDuration;by the
     * specified value.
 private  Integer  initialCapacity; *
     * @return the privateprevious Long maximumSize;

        public DefaultLookupCacheFactory.Builder expireAfterAccess(Duration duration) {value rows associated with key, or null if there was no mapping for key.
     * @param key - key row with which expireAfterAccessDurationthe =specified duration;
value is to be associated
     * @param value return this;
value rows to be associated with the specified }key

     */
   public DefaultLookupCacheFactory.BuilderCollection<RowData> expireAfterWrite(Duration duration) {put(RowData key, Collection<RowData> value);

    /** Discards any cached value for the specified expireAfterWriteDuration = duration;
      key. */
    void  return thisinvalidate(RowData key);

    /** Returns the number of key-value mappings in the cache. */
    long size();
}

DefaultLookupCache

As the cache should be instantiated during runtime execution to avoid serialization / deserialization, a factory is required for creating the cache. 

Code Block
languagejava
titleDefaultLookupCache
/** Default implementation of {@link LookupCache}. */
@PublicEvolving
public class DefaultLookupCache implements LookupCache {
    private final Duration expireAfterAccessDuration;
    private final Duration expireAfterWriteDuration}

        public DefaultLookupCacheFactory.Builder maximumSize(long maximumSize) {
            this.maximumSize = maximumSize;
            return this;
    private final Long  }maximumSize;

    private final boolean cacheMissingKey;
 public DefaultLookupCacheFactory build() {
            return new DefaultLookupCacheFactory(
      private DefaultLookupCache(
            Duration  expireAfterAccessDuration,
            Duration expireAfterWriteDuration,
       expireAfterWriteDuration,
     Long maximumSize,
			boolean cacheMissingKey) {
        this.expireAfterAccessDuration =   maximumSize)expireAfterAccessDuration;
        }
this.expireAfterWriteDuration    }
}

LookupCacheMetricGroup

An interface defining all cache related metric:

Code Block
languagejava
titleCacheMetricGroup
/**
 * Pre-defined metrics for {@code LookupCache}.
 *
 * <p>Please note that these methods should only be invoked once. Registering a metric with same
 * name for multiple times would lead to an undefined behavior.
 */
@PublicEvolving
public interface LookupCacheMetricGroup extends MetricGroup= expireAfterWriteDuration;
        this.initialCapacity = initialCapacity;
        this.maximumSize = maximumSize;
		this.cacheMissingKey = cacheMissingKey;
    }
    
    public static Builder newBuilder() {
    /** The number of cachereturn hits. */new Builder();
    void hitCounter(Counter hitCounter);} 

   public static class /**Builder The{ number  of  cache  misses.  */
     void missCounter(Counter missCounter);

 private Duration expireAfterAccessDuration;
 /** The number of times to load dataprivate intoDuration cacheexpireAfterWriteDuration;
 from external system. */
    voidprivate loadCounter(Counter loadCounter);
Long maximumSize;
    /** The number of loadprivate failures. */
Boolean cacheMissingKey;

        public voidBuilder numLoadFailuresCounterexpireAfterAccess(CounterDuration numLoadFailuresCounterduration); {

     /** The time spent for the latest loadexpireAfterAccessDuration operation. */
= duration;
        void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);

  return this;
 /** The number of records in cache. */}

    void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);

  public  /** The number of bytes used by cache. */
Builder expireAfterWrite(Duration duration) {
          void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
}

LookupFunctionProvider

This is the API between table framework and user's table source. Implementation should define how to create a lookup function and whether to use cache. 

Code Block
languagejava
titleLookupFunctionProvider
/**
 * Provider for creating {@link LookupFunction} and {@link LookupCacheFactory} if caching should be
 * enabled for the lookup table.
 */
@PublicEvolving
public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {
    
    /** Creates a builder of {@link LookupFunctionProvider}. */
    static Builder newBuilder() {
        return new Builder();
 expireAfterWriteDuration = duration;
            return this;
        }

        public Builder maximumSize(long maximumSize) {
            this.maximumSize = maximumSize;
            return this;
        }

      /** Creates an {@link LookupFunction} instance. */
  public Builder cacheMissingKey(boolean cacheMissingKey) {
         LookupFunction createLookupFunction();

  this.cacheMissingKey = /**cacheMissingKey;
     * Gets the {@link LookupCacheFactory} for creating lookupreturn cache.this;
     *
    } *  <p>This  factory  will  be 

 used for creating an instance of cache duringpublic runtimeDefaultLookupCache executionbuild() for{
     *   optimizing the access to externalreturn lookupnew table.DefaultLookupCache(
     *
     * @return an {@link Optional} of {@link LookupCacheFactory}, or an emptyexpireAfterAccessDuration,
 {@link Optional} if
     *     caching shouldn't be applies to the lookup table.expireAfterWriteDuration,
     */
    Optional<LookupCacheFactory> getCacheFactory();

    /**
     * Whether the missing key (key fields without any matching value rows) should be stored in the
     * maximumSize,
					cacheMissingKey);
        }
    }     
}

CacheMetricGroup

An interface defining all cache related metric:

Code Block
languagejava
titleCacheMetricGroup
/**
 * Pre-defined metrics for cache.
     *
     * <p>Please note that thisthese optionmethods isshould requiredonly ifbe {@link #getCacheFactory()} returnsinvoked once. Registering a non-empty
metric with   same
 * instance.name Iffor themultiple cachetimes factorywould islead empty,to thean returnundefined value of this function will be ignored.behavior.
 */
@PublicEvolving
public interface CacheMetricGroup extends MetricGroup {
     /**
 The number of cache hits. */
 @return true if avoid null or empty value should be stored in the cache.
     hitCounter(Counter hitCounter);

    /** The number of cache misses. */
    Optional<Boolean>void cacheMissingKeymissCounter(Counter missCounter);

      /** Builder class for {@link LookupFunctionProvider} The number of times to load data into cache from external system. */
    classvoid Builder {loadCounter(Counter loadCounter);

    /** The number of privateload LookupFunction lookupFunction;failures. */
    void numLoadFailuresCounter(Counter numLoadFailuresCounter);

   private LookupCacheFactory cacheFactory;
        private Boolean enableCacheMissingKey;

    /** The time spent for the latest load operation. */
    void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);

    /** Sets lookup functionThe number of records in cache. */
    void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);

  public Builder withLookupFunction(LookupFunction lookupFunction) {
       /** The number of bytes used by cache. */
    void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
}

PartialCachingLookupProvider

This is the API between table framework and user's table source. Implementation should define how to create a lookup function and whether to use cache. 

Code Block
languagejava
titlePartialCachingLookupProvider
/**
 * Provider for creating {@link LookupFunction} and {@link LookupCache} for storing lookup entries.
 */
@PublicEvolving
public interface PartialCachingLookupProvider extends LookupFunctionProvider {

    /**
     * Build a {@link PartialCachingLookupProvider} from the specified {@link LookupFunction} and
     * {@link LookupCache}. this.lookupFunction = lookupFunction;
            return this;
        }

        /** Enables caching and sets the cache factory. */
        public Builder withCacheFactory(LookupCacheFactory cacheFactory) {
     */
    static PartialCachingLookupProvider of(LookupFunction lookupFunction, this.cacheFactory = cacheFactory;LookupCache cache) {
        return new PartialCachingLookupProvider() {

   return this;
        }
@Override
        /**
    public LookupCache getCache() {
  * Enables storing missing key in the cache.
        return *cache;
         * <p>See {@link LookupFunctionProvider#cacheMissingKey()}

 for more details.
         */@Override
            public BuilderLookupFunction enableCacheMissingKeycreateLookupFunction() {
            this.enableCacheMissingKey = true    return lookupFunction;
            return this;}
        };

    }

    public/** LookupFunctionProviderGet build() {
     a new instance of {@link LookupCache}. */
       // Build LookupFunctionProvider
        }
    }
}

AsyncLookupFunctionProvider

LookupCache getCache();
}

PartialCachingAsyncLookupProvider

Code Block
languagejava
titlePartialCachingAsyncLookupProvider
/**
 * Provider for creating {@link AsyncLookupFunction} and {@link LookupCache} for storing lookup entries.
 */
Code Block
languagejava
titleAsyncLookupFunctionProvider
@PublicEvolving
public interface AsyncLookupFunctionProviderPartialCachingAsyncLookupProvider extends LookupTableSource.LookupRuntimeProviderAsyncLookupFunctionProvider {

      /** Creates a builder of
     * Build a {@link PartialCachingLookupProvider} from the specified {@link AsyncLookupFunction} and
     * {@link LookupFunctionProviderLookupCache}.
     */
    static BuilderPartialCachingLookupProvider newBuilder(of(AsyncLookupFunction asyncLookupFunction, LookupCache cache) {
        return new BuilderPartialCachingAsyncLookupProvider(); {

    }

         @Override
   /** Creates an {@link AsyncLookupFunction} instance. */
   public AsyncLookupFunctionLookupCache createAsyncLookupFunctiongetCache(); {

    /**
     * Gets the {@link LookupCacheFactory} for creating lookupreturn cache.
;
            *}

     *   <p>This factory will be used@Override
 for creating an instance of cache during runtime execution for
  public AsyncLookupFunction createAsyncLookupFunction() *{
 optimizing the access to external lookup table.
     *
    return *asyncLookupFunction;
 @return an {@link Optional} of {@link LookupCacheFactory}, or an empty {@link Optional} if
     *   };
  caching shouldn't be applies to the lookup table.
      }

    /** Get a new instance of {@link LookupCache}. */
    Optional<LookupCacheFactory>LookupCache getCacheFactorygetCache();
}

FullCachingLookupProvider

This interface is for supporting full cache strategy. It reuses ScanRuntimeProvider and defines reload time. 

Code Block
languagejava
titleFullCachingLookupProvider
/**
 * A  /**
     * Whether the missing key (key fields without any matching value rows) should be stored in the
     * cache.
     *
     * <p>Please note that this option is required if {@link #getCacheFactory()} returns a non-empty
     * instance. If the cache factory is empty, the return value of this function will be ignored.
     *
     * @return true if a null or empty value should be stored in the cache.
     */
    Optional<Boolean> cacheMissingKey();
    
    /** Builder class for {@link AsyncLookupFunctionProvider}. */
    class Builder {

{@link CachingLookupProvider} that never lookup in external system on cache miss and provides a
 * cache for holding all entries in the external system. The cache will be fully reloaded from the
 * external system by the {@link ScanTableSource.ScanRuntimeProvider} and reload operations will be
 * triggered by the {@link CacheReloadTrigger}.
 */
@PublicEvolving
public interface FullCachingLookupProvider extends LookupFunctionProvider {
    static FullCachingLookupProvider of(
            ScanTableSource.ScanRuntimeProvider scanRuntimeProvider,
            CacheReloadTrigger cacheReloadTrigger) {
        return new FullCachingLookupProvider() {
            @Override
            public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider() {
        private AsyncLookupFunction asyncLookupFunction;
      return scanRuntimeProvider;
 private LookupCacheFactory cacheFactory;
        private Boolean enableCacheMissingKey;}

        /** Sets lookup function. */@Override
            public BuilderCacheReloadTrigger withAsyncLookupFunctiongetCacheReloadTrigger(AsyncLookupFunction asyncLookupFunction) {
            this.asyncLookupFunction = asyncLookupFunction;
         return cacheReloadTrigger;
   return this;
        }

        /** Enables caching and sets@Override
 the cache factory. */
        public BuilderLookupFunction withCacheFactorycreateLookupFunction(LookupCacheFactory cacheFactory) {
            this.cacheFactory = cacheFactory    return keyRow -> null;
            return this;}
        };

    }

    /**
     * Get a {@link ScanTableSource.ScanRuntimeProvider} for scanning all entries from the external
     * Enableslookup storingtable missingand keyload ininto the cache.
     */
    ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();

    /*
* Get a {@link CacheReloadTrigger} for triggering the reload operation. */
  <p>See  {@linkCacheReloadTrigger AsyncLookupFunctionProvider#cacheMissingKey()}getCacheReloadTrigger();
}

CacheReloadTrigger

A trigger defining custom logic for triggering full cache reloading.

Code Block
languagejava
titleCacheReloadTrigger
/** Customized trigger for morereloading details.
all lookup table entries in full caching  mode. */
@PublicEvolving
public interface CachingReloadTrigger extends AutoCloseable,    public Builder enableCacheMissingKey() {Serializable {

    /** Open       this.enableCacheMissingKey = true;the trigger. */
    void open(Context context) throws Exception;

    return this;
/**
     * Context of {@link CacheReloadTrigger}

  for getting information about times and
 public AsyncLookupFunctionProvider build() {
 * triggering reload.
     */
    //interface BuildContext AsyncLookupFunctionProvider{

        }
/** Get current  }
}

FullCachingLookupProvider

This interface is for supporting full cache strategy. It reuses ScanRuntimeProvider and defines interval of reload. 

processing time. */
        long currentProcessingTime();

        /** Get current watermark on the main stream. */
        long currentWatermark();

        /** Trigger a reload operation on the full cache. */
        CompletableFuture<Void> triggerReload();
    }
}

PeriodicCacheReloadTrigger

An implementation of FullCachingReloadTrigger that triggers reload with a specified interval.

Code Block
languagejava
titlePeriodicCacheReloadTrigger
/** A trigger that reloads all entries periodically with specified interval or delay. */
public class PeriodicCacheReloadTrigger implements CacheReloadTrigger {

    private final Duration reloadInterval;
    private final ScheduleMode scheduleMode;

    public PeriodicCacheReloadTrigger(Duration reloadInterval, ScheduleMode scheduleMode) {
Code Block
languagejava
titleRescanRuntimeProvider
/**
 * Runtime provider for fully loading and periodically reloading all entries of the lookup table and
 * storing the table locally for lookup.
 *
 * <p>Implementations should provide a {@link ScanTableSource.ScanRuntimeProvider} in order to reuse
 * the ability of scanning for loading all entries from the lookup table.
 */
@PublicEvolving
public interface FullCachingLookupProvider extends LookupTableSource.LookupRuntimeProvider {

    /**
     * Creates {@link FullCachingLookupProvider} with provided scan runtime provider and rescan
     * interval.
     */
    static FullCachingLookupProvider of(
            ScanTableSourcethis.ScanRuntimeProviderreloadInterval scanRuntimeProvider,= Duration reloadInterval) {;
        returnthis.scheduleMode new FullCachingLookupProvider() {
= scheduleMode;
    }

    @Override
    public @Override
   void open(CacheReloadTrigger.Context context) {
        // publicRegister ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider() {periodic reload task
    }

    @Override
    public void close() throws returnException scanRuntimeProvider;{
        // Dispose resources
    }

    public enum ScheduleMode {
     @Override
   FIXED_DELAY,
        FIXED_RATE
 public Duration getReloadInterval() {
                return reloadInterval;
            }}
}

TimedCacheReloadTrigger

Code Block
languagejava
titleTimedCacheReloadTrigger
/** A trigger that reloads at a specific local time and repeat for the given interval in days. */ 
public class TimedCacheReloadTrigger implements CacheReloadTrigger {

    private final LocalTime reloadTime;
    private final int  }reloadIntervalInDays;

    }

    /**public TimedCacheReloadTrigger(LocalTime reloadTime, int reloadIntervalInDays) {
     *  Gets the {@link org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider}this.reloadTime = reloadTime;
     * for executing thethis.reloadIntervalInDays periodically= reload.
 reloadIntervalInDays;
    }

    */@Override
    public ScanTableSource.ScanRuntimeProvidervoid getScanRuntimeProvideropen();

 Context context) {
		// Register periodic reload task
    }

    @Override
   /** Getspublic thevoid intervalclose() betweenthrows two reload operations. */Exception {
		// Dispose resources
    Duration getReloadInterval();}
}

TableFunctionProvider / AsyncTableFunctionProvider

...

In order to unify the usage of caching across all connectors, we'd like to introduce some common table options, which are defined under class LookupOptions. Note that these options are not required to implement by all connectors. 

OptionTypeDescriptions
lookup.cacheEnum of NONE, PARTIAL and FULL

The caching strategy for this lookup table.

NONE: Do not use cache

Partial: Use partial caching mode

FULL: Use full caching mode

lookup.async

lookup.max-retriesIntegerThe maximum allowed retries if a lookup operation fails
lookup.partial-cache.expire-after-accessDurationDuration to expire an entry in the cache after accessing
lookup.partial-cache.expire-after-writeDurationDuration to expire an entry in the cache after writing
lookup.partial-cache.cache-missing-keyBooleanWhether to
use asynchronous mode for the lookup
store an empty value into the cache if the lookup key doesn't match any rows in the table
lookup.partial-cache.max-
retries
rows
Integer
LongThe maximum
allowed retries if a lookup operation fails
number of rows to store in the cache
lookup.
partial
full-cache.
expire-after-access
reload-strategyEnum of PERIODIC and TIMED

The reload strategy for the full cache scenario.

PERIODIC: Use PeriodicCacheReloadTrigger

TIMED: Use TimedCacheReloadTrigger

lookup.full-cache.periodic-reload.interval
DurationDuration to expire an entry in the cache after accessinglookup.partial-cache.expire-after-write
DurationDuration to
expire an entry
trigger reload in the
cache after writing
PeriodicCacheReloadTrigger
lookup.
partial
full-cache.
cache
periodic-
missing
reload.schedule-
keyBooleanWhether to store an empty value into the cache if the lookup key doesn't match any rows in the tablelookup.partial-cache.max-rowsLongThe maximum number of rows to store in the cache
modeEnum of FIXED_DELAY and FIXED_RATEThe periodically schedule mode of reloading in the PeriodicCacheReloadTrigger
lookup.full-cache.timed-reload.iso-timeString

Time in ISO-8601 format when cache needs to be reloaded. Time can be specified either with timezone or without timezone (target JVM local timezone will be used). See formatter ISO_TIME.

lookup.full-cache.timed-reload.interval-in-
interval
days
DurationInterval of reloading all entries from the lookup table into cache
Integer

The interval in days to trigger the reload at the specified time

Cache Metrics

It is important to mention that a cache implementation does not have to report all the defined metrics. But if a cache reports a metric of the same semantic defined below, the implementation should follow the convention.

NameTypeUnitDescription
numCachedRecordGaugeRecordsThe number of records in cache.
numCachedBytesGaugeBytesThe number of bytes used by cache.
hitCountCounter
The number of cache hits
missCountCounter
The number of cache misses, which might leads to loading operations
loadCountCounter

The number of times to load data into cache from external system.

For partial cache the load count should be equal to miss count, but for all cache this would be different.

numLoadFailureCounter
The number of load failures
latestLoadTimeGaugemsThe time spent for the latest load operation

Here we just define fundamental metrics and let the external metric system make the aggregation to get more descriptive values such as hitRate = hitCount / (hitCount + missCount).

...