Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In order to address the issues above, we propose here to define a unified abstraction for lookup source cache and its related metrics.

Proposed Changes

Top-level APIs

In order to clarify the semantic of lookup, weWe'd like to split the proposal into two kinds of caching mode: partial caching and full caching.

Partial caching

Partial caching loads data into the cache along with the access to the external system. If the key to lookup does not exist in the cache, a lookup action to the external system will be triggered and the lookup result will be stored in the cache for further loading. Users and lookup table developers are able to config the eviction policy and maximum size of the cache.

In order to support partial caching, we propose to introduce several new interfaces to simplify the work for developers to implement lookup table functions and enable cache:

...

introduce some top-level APIs for general lookup operations:

  • LookupFunction / AsyncLookupFunction, an extended version of TableFunction to make the API more straight forward.
  • LookupFunctionProvider / AsyncLookupProvider, serve as the creator of LookupFunction / AsyncLookupFunction in table source

And APIs related to the cache:

  • LookupCache, defining the cache used in lookup table.
  • DefaultLookupCache a default implementation of a cache that suitable for most use cases.
  • CacheMetricGroup, defining metrics should be reported by the cache.

Partial and Full Caching

More specifically,  we'd like to provide public interfaces for the most 2 common cases to lookup source developers, which we call partial and full caching.

Partial caching

Partial caching loads data into the cache along with the access to the external system. If the key to lookup does not exist in the cache, a lookup action to the external system will be triggered and the lookup result will be stored in the cache for further loading. Users and lookup table developers are able to config the eviction policy and maximum size of the cache.

In order to support partial caching, we propose to introduce 2 new interfaces:

...

  • PartialCachingLookupProvider / AsyncPartialCachingLookupProvider, as the API interacting with table source to get LookupFunction and LookupCacheFactoryLookupCache.

The cache serves as a component in LookupJoinRunner, and would be pluggable by specifying LookupCacheFactory in LookupFunctionProvider. The developer of a lookup table needs to define a LookupFunctionProvider / AsyncLookupProvider in their implementation of LookupTableSource to specify the LookupFunction and the factory of the cache, then the the constructor of the provider. The planner will take over the cache factory, lookup function and the cache created from the provider and pass it to the LookupJoinRunner, and the . The cache will be instantiated during the runtime execution and loading operations via lookup function if there's a cache miss.

Full Caching

If the size of lookup table is relatively small to fit into the memory, and the lookup table doesn't change frequently, it'll be more efficient to load all entries of the lookup table into the cache to reduce network I/O, and refresh the table periodically.  We'd like to name this use case as "full cache". Logically the reload operation is a kind of scan, so we'd like to reuse the ScanRuntimeProvider so that developers could reuse the scanning logic implemented in Source / SourceFunction / InputFormat. Considering the complexity of Source API, we'd like to support SourceFunction and InputFormat API first. Supporting Source API might require new topology and will be discussed later in another FLIP.

...

As the usage of TableFunction interface is not quite straight forward to lookup table developers, we'd like to introduce a new interface for sync and async lookup tables. Caching will be only supported on LookupFunction / AsyncLookupFunction.

Code Block
languagejava
titleLookupFunction
/**
 * A wrapper class of {@link TableFunction} for synchronously lookup rows matching the lookup keys
 * from external system.
 *
 * <p>The output type of this table function is fixed as {@link RowData}.
 */
@PublicEvolving
public abstract class LookupFunction extends TableFunction<RowData> {

    /**
     * Synchronously lookup rows matching the lookup keys.
     *
     * @param keyRow - A {@link RowData} that wraps keys to lookup.
     * @return A collections of all matching rows in the lookup table.
     */
    public abstract Collection<RowData> lookup(RowData keyRow) throws IOException;

    /** Invoke {@link #lookup} and handle exceptions. */
    public final void eval(Object... keys) {
        try {
            lookup(GenericRowData.of(keys)).forEach(this::collect);
        } catch (IOException e) {
            throw new RuntimeException("Failed to lookup values with given key", e);
        }
    }
}
Code Block
languagejava
titleAsyncLookupFunction
/**
 * A wrapper class of {@link AsyncTableFunction} for asynchronously lookup rows matching the lookup
 * keys from external system.
 *
 * <p>The output type of this table function is fixed as {@link RowData}.
 */
@PublicEvolving
public abstract class AsyncLookupFunction extends AsyncTableFunction<RowData> {

    /**
     * Asynchronously lookup rows matching the lookup keys.
     *
     * @param keyRow - A {@link RowData} that wraps keys to lookup.
     * @return A collections of all matching rows in the lookup table.
     */
    public abstract CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow);

    /** Invokes {@link #asyncLookup} and chains futures. */
    public final void eval(CompletableFuture<Collection<RowData>> future, Object... keys) {
         asyncLookup(GenericRowData.of(keys))
                .whenCompleteAsync(
                        (result, exception) -> {
                            if (exception != null) {
                                future.completeExceptionally(exception);
                                return;
                            }
                            future.complete(result);
                        });
    }
}

LookupCache

Considering there might be custom caching strategies and optimizations, we'd like to expose the cache interface as public API for developers to make the cache pluggable.

Code Block
languagejava
titlePartialCache
/**
 * A semi-persistent mapping from keys to values for storing entries of lookup table.
 *
 * <p>The type of the caching key is a {@link RowData} with lookup key fields packed inside. The
 * type of value is a {@link Collection} of {@link RowData}, which are rows matching lookup key
 * fields.
 *
 * <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either
 * evicted or manually invalidated.
 *
 * <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
 * by multiple concurrent threads.
 */
@PublicEvolving
public interface LookupCache extends AutoClosable {

    /**
     * Initialize the cache.
     *
     * @param metricGroup the metric group to register cache related metrics.
     */
    void open(CacheMetricGroup metricGroup);

    /**
     * Returns the value associated with key in this cache, or null if there is no cached value for
     * key.
     */
    @Nullable
    Collection<RowData> getIfPresent(RowData key);

    /**
     * Associates the specified value rows with the specified key row in the cache. If the cache
     * previously contained value associated with the key, the old value is replaced by the
     * specified value.
      *
     * @return the previous value rows associated with key, or null if there was no mapping for key.
     * @param key - key row with which the specified value is to be associated
     * @param value – value rows to be associated with the specified key
     */
    Collection<RowData> put(RowData key, Collection<RowData> value);

    /** Discards any cached value for the specified key. */
    void invalidate(RowData key);

    /** Returns the number of key-value mappings in the cache. */
    long size();
}

DefaultLookupCache

As the cache should be instantiated during runtime execution to avoid serialization / deserialization, a factory is required for creating the cache. 

lookup tables. Caching will be only supported on LookupFunction / AsyncLookupFunction.

Code Block
languagejava
titleLookupFunction
/**
 * A wrapper class of {@link TableFunction} for synchronously lookup rows matching the lookup keys
 * from external system.
 *
 * <p>The output type of this table function is fixed as {@link RowData}.
 */
@PublicEvolving
public abstract class LookupFunction extends TableFunction<RowData> {

    /**
     * Synchronously lookup rows matching the lookup keys.
     *
     * @param keyRow - A {@link RowData} that wraps keys to lookup.
     * @return A collections of all matching rows in the lookup table.
     */
    public abstract Collection<RowData> lookup(RowData keyRow) throws IOException;

    /** Invoke {@link #lookup} and handle exceptions. */
    public final void eval(Object... keys
Code Block
languagejava
titleDefaultPartialCache
/** Default implementation of {@link LookupCache}. */
@PublicEvolving
public class DefaultLookupCache implements LookupCache {
    private final Duration expireAfterAccessDuration;
    private final Duration expireAfterWriteDuration;
    private final Long maximumSize;
    private final boolean cacheMissingKey;
    
    private DefaultLookupCache(
            Duration expireAfterAccessDuration,
            Duration expireAfterWriteDuration,
            Long maximumSize,
			boolean cacheMissingKey) {
        this.expireAfterAccessDuration = expireAfterAccessDuration;
        this.expireAfterWriteDuration = expireAfterWriteDuration;
        this.initialCapacity = initialCapacity;
        this.maximumSize = maximumSize;
		this.cacheMissingKey = cacheMissingKey;
    }
    
    public static Builder newBuilder() {
        return new Builder();
    } 

   public static class Builder {         
        private Duration expireAfterAccessDuration;
        private Duration expireAfterWriteDuration;
        private Long maximumSize;
        private Boolean cacheMissingKey;

        public Builder expireAfterAccess(Duration duration) {
            expireAfterAccessDuration = duration;try {
            return thislookup(GenericRowData.of(keys)).forEach(this::collect);
        }

        public Builder expireAfterWrite(Duration durationcatch (IOException e) {
            expireAfterWriteDurationthrow = duration;
            return thisnew RuntimeException("Failed to lookup values with given key", e);
        }

    }
}


Code Block
languagejava
titleAsyncLookupFunction
/**
 * A wrapper publicclass Builder maximumSize(long maximumSize) {
            this.maximumSize = maximumSize;
            return this;
        }

        public Builder cacheMissingKey(boolean cacheMissingKey) {
            this.cacheMissingKey = cacheMissingKey;
            return this;
        }          

        public DefaultLookupCache build() {
            return new DefaultLookupCache(
                    expireAfterAccessDuration,of {@link AsyncTableFunction} for asynchronously lookup rows matching the lookup
 * keys from external system.
 *
 * <p>The output type of this table function is fixed as {@link RowData}.
 */
@PublicEvolving
public abstract class AsyncLookupFunction extends AsyncTableFunction<RowData> {

    /**
     * Asynchronously lookup rows matching the lookup keys.
     *
     * @param keyRow - A {@link RowData} that wraps keys to lookup.
     * @return A collections of all matching rows in the lookup table.
     */
    public abstract CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow);

    /** Invokes {@link #asyncLookup} and chains futures. */
    public final void eval(CompletableFuture<Collection<RowData>> future, Object... keys) {
         asyncLookup(GenericRowData.of(keys))
                .whenCompleteAsync(
    expireAfterWriteDuration,
                    maximumSize,
					cacheMissingKey);
     (result, exception) -> {
   }
    }     
}

CacheMetricGroup

An interface defining all cache related metric:

Code Block
languagejava
titleLookupCacheMetricGroup
/**
 * Pre-defined metrics for cache.
 *
 * <p>Please note that these methods should only be invoked once. Registeringif a(exception metric!= withnull) same{
 * name for multiple times would lead to an undefined behavior.
 */
@PublicEvolving
public interface CacheMetricGroup extends MetricGroup {
    /** The number of cache hits. */
    void hitCounter(Counter hitCounterfuture.completeExceptionally(exception);

         /** The number of cache misses. */
    void missCounter(Counter missCounter);

    /** The number of times to load datareturn;
 into cache from external system. */
    void loadCounter(Counter loadCounter);

    /** The number of load failures. */
    void numLoadFailuresCounter(Counter numLoadFailuresCounter);

 }
    /** The time spent for the latest load operation. */
    void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);

    /** The number of records in cache. */future.complete(result);
    void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);

    /** The number of bytes used by cache. */
    void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
}

PartialCachingLookupProvider

...

});
    }
}

LookupFunctionProvider

Code Block
languagejava
titleLookupFunctionProviderLookupFunction
//**
 *A Providerprovider for creating {@link LookupFunction} and {@link LookupCache} if caching should be enabled
 * for the lookup table.
 */
@PublicEvolving
public interface PartialCachingLookupProvider extends LookupTableSource.LookupRuntimeProvider {

    /** Creates a builder of {@link PartialCachingLookupProvider}. */LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {

    static BuilderLookupFunctionProvider newBuilderof(LookupFunction lookupFunction) {
        return new Builder() -> lookupFunction;
    }

    /** Creates an {@link LookupFunction} instance. */
    LookupFunction createLookupFunction();

    }

AsyncLookupFunctionProvider

Code Block
languagejava
titleLookupFunction
/**
 A provider   * Gets the instance offor creating {@link LookupCacheAsyncLookupFunction}. */
@PublicEvolving
public interface AsyncLookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider *{

    static AsyncLookupFunctionProvider of(AsyncLookupFunction *asyncLookupFunction) <p>This{
 cache will be initialized by {@link LookupCache#open} duringreturn runtime() execution-> andasyncLookupFunction;
     * used for optimizing the access to external lookup table.
     *
     * @return an {@link Optional} of {@link LookupCache}, or an empty {@link Optional} if caching
     *     shouldn't be applies to the lookup table.
     */
    Optional<LookupCache> getCache();

    /** Builder class for {@link PartialCachingLookupProvider}. */
    class Builder {

        private LookupFunction lookupFunction;
        private LookupCache cache;
        private Boolean cacheMissingKey;

        /** Sets lookup function. */
        public Builder withLookupFunction(LookupFunction lookupFunction) {
            this.lookupFunction = lookupFunction;
            return this;
        }

        /** Enables caching and sets the cache factory. */
        public Builder withCache(LookupCache cache, boolean cacheMissingKey) {
            this.cache = cache;
            this.cacheMissingKey = cacheMissingKey;
            return this;
        }

        public PartialCachingLookupProvider build() {
           ...
        }
    }
}

AsyncPartialCachingLookupProvider

}

    AsyncLookupFunction createAsyncLookupFunction();
}

LookupCache

Considering there might be custom caching strategies and optimizations, we'd like to expose the cache interface as public API for developers to make the cache pluggable.

Code Block
languagejava
titlePartialCache
/**
 * A semi-persistent mapping from keys to values for storing entries of lookup table.
 *
 * <p>The type of the caching key is a {@link RowData} with lookup key fields packed inside. The
 * type of value is a {@link Collection} of {@link RowData}, which are rows matching lookup key
 * fields.
 *
 * <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either
 * evicted or manually invalidated.
 *
 * <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
 * by multiple concurrent threads.
 */
@PublicEvolving
public interface LookupCache extends AutoClosable {

    /**
     * Initialize the cache.
     *
     * @param metricGroup the metric group to register cache related metrics.
     */
    void open(CacheMetricGroup metricGroup);

    /**
     * Returns the value associated with key in this cache, or null if there is no cached value for
     * key.
     */
    @Nullable
    Collection<RowData> getIfPresent(RowData key);

    /**
     * Associates the specified value rows with the specified key row in the cache. If the cache
     * previously contained value associated with the key, the old value is replaced by the
     * specified value.
      *
     * @return the previous value rows associated with key, or null if there was no mapping for key.
     * @param key - key row with which the specified value is to be associated
     * @param value – value rows to be associated with the specified key
     */
    Collection<RowData> put(RowData key, Collection<RowData> value
Code Block
languagejava
titleAsyncLookupFunctionProvider
/**
 * Provider for creating {@link AsyncLookupFunction} and {@link LookupCache} if caching should be
 * enabled for the lookup table.
 */
@PublicEvolving
public interface AsyncPartialCachingLookupProvider extends LookupTableSource.LookupRuntimeProvider {

    /** Creates a builder of {@link AsyncPartialCachingLookupProvider}. */
    static AsyncPartialCachingLookupProvider.Builder newBuilder() {
        return new AsyncPartialCachingLookupProvider.Builder();
    }

    /** Creates an {@link AsyncLookupFunction} instance. */
    AsyncLookupFunction createAsyncLookupFunction();

    /**
 Discards any cached value *for Gets the {@link LookupCache} for creating lookup cache.
     *
 specified key. */
    void invalidate(RowData key);

    /** <p>ThisReturns factorythe willnumber beof usedkey-value formappings creating an instance ofin the cache during runtime execution for. */
    long size();
}

DefaultLookupCache

As the cache should be instantiated during runtime execution to avoid serialization / deserialization, a factory is required for creating the cache. 

Code Block
languagejava
titleDefaultPartialCache
/** Default implementation  * optimizing the access to external lookup table.
     *
     * @return an {@link Optional} of {@link LookupCache}, or an empty {@link Optional} if caching
     *. */
@PublicEvolving
public class DefaultLookupCache implements LookupCache {
    private final Duration expireAfterAccessDuration;
    private shouldn'tfinal be applies to the lookup table.
  Duration expireAfterWriteDuration;
   */
 private final Long Optional<LookupCache> getCache()maximumSize;

    private /**final Builderboolean classcacheMissingKey;
 for {@link AsyncPartialCachingLookupProvider}. */
    class Builder {
private DefaultLookupCache(
        private AsyncLookupFunction asyncLookupFunction;
  Duration expireAfterAccessDuration,
     private LookupCache cache;
     Duration expireAfterWriteDuration,
  private Boolean cacheMissingKey;

        /** Sets lookup function. */Long maximumSize,
			boolean cacheMissingKey) {
        public AsyncPartialCachingLookupProvider.Builder withLookupFunction(this.expireAfterAccessDuration = expireAfterAccessDuration;
        this.expireAfterWriteDuration = expireAfterWriteDuration;
      AsyncLookupFunction asyncLookupFunction) {
   this.initialCapacity = initialCapacity;
        this.maximumSize = maximumSize;
		this.asyncLookupFunctioncacheMissingKey = asyncLookupFunctioncacheMissingKey;
    }
    
    public static Builder newBuilder() {
        return thisnew Builder();
    } 

   public static class }

Builder {         
  /** Enables caching and sets the cacheprivate factory. */Duration expireAfterAccessDuration;
        publicprivate AsyncPartialCachingLookupProvider.Builder withCache(Duration expireAfterWriteDuration;
        private Long maximumSize;
       LookupCache cache,private booleanBoolean cacheMissingKey) {;

        public Builder expireAfterAccess(Duration  this.cache = cache;duration) {
            this.cacheMissingKeyexpireAfterAccessDuration = cacheMissingKeyduration;
            return this;
        }

        public AsyncPartialCachingLookupProvider build(Builder expireAfterWrite(Duration duration) {
             ...expireAfterWriteDuration = duration;
        }
    }
}

FullCachingLookupProvider

This interface is for supporting full cache strategy. It reuses ScanRuntimeProvider and defines reload time. 

Code Block
languagejava
titleFullCachingLookupProvider
/**
 * Runtime provider for fully loading and periodically reloading all entries of the lookup table and
 * storing the table locally for lookup.
 *
 * <p>Implementations should provide a {@link ScanTableSource.ScanRuntimeProvider} in order to reuse
 * the ability of scanning for loading all entries from the lookup table.
 */
@PublicEvolving
public interface FullCachingLookupProvider extends LookupTableSource.LookupRuntimeProvider {

    static FullCachingLookupProvider of(return this;
        }

        public Builder maximumSize(long maximumSize) {
            this.maximumSize = maximumSize;
            return this;
        }

        public Builder   ScanTableSource.ScanRuntimeProvider scanRuntimeProvider,cacheMissingKey(boolean cacheMissingKey) {
            FullCachingReloadTrigger fullCachingReloadTrigger) {
 this.cacheMissingKey = cacheMissingKey;
       return new FullCachingLookupProvider() {
  return this;
        } @Override
         

        public ScanTableSource.ScanRuntimeProviderDefaultLookupCache getScanRuntimeProviderbuild() {
            return new DefaultLookupCache(
  return scanRuntimeProvider;
            }

     expireAfterAccessDuration,
       @Override
            public FullCachingReloadTrigger getReloadTrigger() { expireAfterWriteDuration,
                return fullCachingReloadTrigger;
            }maximumSize,
					cacheMissingKey);
        };
    }

       
}

CacheMetricGroup

An interface defining all cache related metric:

Code Block
languagejava
titleLookupCacheMetricGroup
/**
     * GetsPre-defined the {@link ScanTableSource.ScanRuntimeProvider} for executing the periodically reloadmetrics for cache.
     */
 * <p>Please note ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();

    /** Get the {@link FullCachingReloadTrigger} for triggering a full caching reload operation. */
    FullCachingReloadTrigger getReloadTrigger();
}

FullCachingReloadTrigger

A trigger defining custom logic for triggering full cache reloading.

Code Block
languagejava
titleFullCachingReloadTrigger
/** Customized trigger for reloading all lookup table entries in full caching mode. */
public interface FullCachingReloadTrigger extends AutoCloseable, Serializable {that these methods should only be invoked once. Registering a metric with same
 * name for multiple times would lead to an undefined behavior.
 */
@PublicEvolving
public interface CacheMetricGroup extends MetricGroup {
    /** The number of cache hits. */
    void hitCounter(Counter hitCounter);

    /** Open the triggerThe number of cache misses. */
    void openmissCounter(ContextCounter contextmissCounter) throws Exception;

    /**
     * Context of {@link FullCachingReloadTrigger} for getting information about times and The number of times to load data into cache from external system. */
    void loadCounter(Counter loadCounter);

     /** triggeringThe reload.
number of load failures.  */
    interfacevoid Context {numLoadFailuresCounter(Counter numLoadFailuresCounter);

    /** The time spent /**for Getthe currentlatest processingload timeoperation. */
    void    long currentProcessingTime(latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);

        /** Get currentThe watermarknumber onof therecords mainin streamcache. */
    void    long currentWatermark(numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);

        /** TriggerThe anumber reloadof operationbytes onused the fullby cache. */
    void    CompletableFuture<Void> triggerReload(numCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
    }
}

PeriodicFullCachingReloadTrigger

An implementation of FullCachingReloadTrigger that triggers reload with a specified interval.

PartialCachingLookupProvider

This is the API between table framework and user's table source. Implementation should define how to create a lookup function and whether to use cache. 

code
Code Block
languagejava
titleLookupFunctionProvider
/**
 A* Provider triggerfor thatcreating reloads{@link allLookupFunction} entriesand periodically{@link withLookupCache} specifiedfor intervalstoring orlookup delayentries.
 */
@PublicEvolving
public classinterface PeriodicFullCachingReloadTriggerPartialCachingLookupProvider implementsextends FullCachingReloadTriggerLookupFunctionProvider { 

    private final DurationLookupFunction reloadIntervallookupFunction;
    private final ScheduleMode scheduleMode;

    private ScheduledExecutorService scheduledExecutor LookupCache cache;

    public PeriodicFullCachingReloadTriggerPartialCachingLookupProvider(DurationLookupFunction reloadIntervallookupFunction, ScheduleModeLookupCache scheduleModecache) {
        this.reloadIntervallookupFunction = reloadIntervallookupFunction;
        this.scheduleModecache = scheduleModecache;
    }

    @Override
    public voidLookupFunction opencreateLookupFunction(FullCachingReloadTrigger.Context context) {
        return lookupFunction;
    }
}

AsyncPartialCachingLookupProvider

Code Block
languagejava
titleLookupFunctionProvider
/**
 * Provider for creating {@link AsyncLookupFunction} and {@link LookupCache} for scheduledExecutorstoring =lookup Executorsentries.newSingleThreadScheduledExecutor();
 */
@PublicEvolving
public interface AsyncPartialCachingLookupProvider extends AsyncLookupFunctionProvider { 
 switch (scheduleMode) {
 private final AsyncLookupFunction asyncLookupFunction;
    private final LookupCache cache;

 case FIXED_RATE:
  public AsyncPartialCachingLookupProvider(AsyncLookupFunction asyncLookupFunction, LookupCache cache) {
         scheduledExecutor.scheduleAtFixedRate(
  this.asyncLookupFunction = asyncLookupFunction;
        this.cache = cache;
       }

     context::triggerReload,@Override
    public AsyncLookupFunction createAsyncLookupFunction() {
        return asyncLookupFunction;
    }
}

FullCachingLookupProvider

This interface is for supporting full cache strategy. It reuses ScanRuntimeProvider and defines reload time. 

Code Block
languagejava
titleFullCachingLookupProvider
/**
 * A {@link LookupFunctionProvider} 0,
that never lookup in external system on cache miss and provides a
 * cache for holding all entries in the external system. The  reloadInterval.toMillis(),
                        TimeUnit.MILLISECONDS);cache will be fully reloaded from the
 * external system and reload operations will be triggered by the {@link CacheReloadTrigger}.
 */
@PublicEvolving
public class FullCachingLookupProvider implements LookupFunctionProvider {

    private final ScanTableSource.ScanRuntimeProvider scanRuntimeProvider;
    private final CacheReloadTrigger   breakreloadTrigger;

    public FullCachingLookupProvider(
       case  FIXED_DELAY:
   ScanTableSource.ScanRuntimeProvider scanRuntimeProvider,
            scheduledExecutor.scheduleWithFixedDelay(
  CacheReloadTrigger reloadTrigger) {
        this.scanRuntimeProvider = scanRuntimeProvider;
        this.reloadTrigger = reloadTrigger;
  () -> {}

    @Override
    public LookupFunction createLookupFunction() {
        return (keyRow) -> null;
    }
}  

CacheReloadTrigger

A trigger defining custom logic for triggering full cache reloading.

Code Block
languagejava
titleFullCachingReloadTrigger
/** Customized trigger for reloading all lookup table entries in full caching mode. */
@PublicEvolving
public interface CachingReloadTrigger extends AutoCloseable, Serializable {

    /** Open the trigger. */
    void open(Context context) throws Exception;

    /**
     * Context of {@link CacheReloadTrigger} for getting information about times and
     * triggering reload.
     */
    interface Context {

        /** Get current processing time. */
 try {
                                context.triggerReload().get();
                            } catch (Exception e) {
                                throw new RuntimeException(
                   long currentProcessingTime();

        /** Get current watermark on the main stream. */
    "Uncaught exception during the reload", elong currentWatermark();

        /** Trigger a reload operation on the full cache. */
        CompletableFuture<Void> triggerReload();
    }
}

PeriodicCacheReloadTrigger

An implementation of FullCachingReloadTrigger that triggers reload with a specified interval.

Code Block
/** A trigger that reloads all entries periodically with specified interval or delay. */
public class PeriodicCacheReloadTrigger implements CacheReloadTrigger {

    private final Duration reloadInterval;
  },
  private final ScheduleMode scheduleMode;

    public PeriodicCacheReloadTrigger(Duration reloadInterval, ScheduleMode scheduleMode) {
        this.reloadInterval = 0,reloadInterval;
        this.scheduleMode = scheduleMode;
    }

    @Override
    public void reloadInterval.toMillis(),open(CacheReloadTrigger.Context context) {
        // Register periodic reload task
    }

    @Override
    public void TimeUnit.MILLISECONDS);
  close() throws Exception {
        // Dispose resources
    break;}

    public enum ScheduleMode {
     default:
   FIXED_DELAY,
        FIXED_RATE
     throw new IllegalArgumentException(
                        String.format("Unrecognized schedule mode \"%s\"", scheduleMode))}
}

TimedCacheReloadTrigger

Code Block
/** A trigger that reloads at a specific local time and repeat for the given interval in days. */ 
public class TimedCacheReloadTrigger implements CacheReloadTrigger {

    private final LocalTime reloadTime;
    private final int reloadIntervalInDays;

 }
    }

    @Overridepublic TimedCacheReloadTrigger(LocalTime reloadTime, int reloadIntervalInDays) {
    public  void close() throws Exception { this.reloadTime = reloadTime;
        if (scheduledExecutor != null) {
  this.reloadIntervalInDays = reloadIntervalInDays;
    }

    @Override
    public void scheduledExecutor.shutdownNow();
        }open(Context context) {
		// Register periodic reload task
    }

    public@Override
 enum ScheduleMode {
 public void close() throws    FIXED_DELAY,
        FIXED_RATEException {
		// Dispose resources
    }
}

TableFunctionProvider / AsyncTableFunctionProvider

...

In order to unify the usage of caching across all connectors, we'd like to introduce some common table options, which are defined under class LookupOptions. Note that these options are not required to implement by all connectors. 

OptionTypeDescriptions
lookup.cacheEnum of NONE, PARTIAL and FULL

The caching strategy for this lookup table.

NONE: Do not use cache

Partial: Use partial caching mode

FULL: Use full caching mode

lookup.max-retriesIntegerThe maximum allowed retries if a lookup operation fails
lookup.partial-cache.expire-after-accessDurationDuration to expire an entry in the cache after accessing
lookup.partial-cache.expire-after-writeDurationDuration to expire an entry in the cache after writing
lookup.partial-cache.cache-missing-keyBooleanWhether to store an empty value into the cache if the lookup key doesn't match any rows in the table
lookup.partial-cache.max-rowsLongThe maximum number of rows to store in
the cache
the cache
lookup.full-cache.reload-strategyEnum of PERIODIC and TIMED

The reload strategy for the full cache scenario.

PERIODIC: Use PeriodicCacheReloadTrigger

TIMED: Use TimedCacheReloadTrigger

lookup.full-cache.periodic-reload
-
.intervalDuration
Interval of reloading all entries from the lookup table into cache
Duration to trigger reload in the PeriodicCacheReloadTrigger
lookup.full-cache.periodic-reload
-
.schedule-modeEnum of FIXED_DELAY and FIXED_RATEThe periodically schedule mode of reloading
in the PeriodicCacheReloadTrigger
lookup.full-cache.timed-reload-at-timeLocalTimeThe local time on the target JVM to trigger a reload 
lookup.full-cache.interval-in-daysInteger

The interval in days to trigger the reload at the specified time

Cache Metrics

It is important to mention that a cache implementation does not have to report all the defined metrics. But if a cache reports a metric of the same semantic defined below, the implementation should follow the convention.

...