Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

JIRA: TBD

...

Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-28415

Release1.16


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

This FLIP is a joint work of initially proposed by Yuan Zhu (zstraw@163.com), and finished by Qingsheng Ren (partial caching part) and Alexander Smirnov (full caching part).


Table of Contents

Motivation

...

In order to address the issues above, we propose here to define a unified abstraction for lookup source cache and its related metrics.

Proposed Changes

We'd like to split the proposal into two kinds of caching mode: partial caching and full caching.

Partial caching

Partial caching loads data into the cache along with the access to the external system. If the key to lookup does not exist in the cache, a lookup action to the external system will be triggered and the lookup result will be stored in the cache for further loading. Users and lookup table developers are able to config the eviction policy and maximum size of the cache.

In order to support partial caching, we propose to introduce several new interfaces to simplify the work for developers to implement lookup table functions and enable cache:

...

Top-level APIs

In order to clarify the semantic of lookup, we'd like to introduce some top-level APIs for general lookup operations without caching:

  • LookupFunction / AsyncLookupFunction, an extended version of TableFunction to make the API more straight forward.
  • LookupFunctionProvider / AsyncLookupProvider, serve as the creator of LookupFunction / AsyncLookupFunction in table source

And APIs related to the cache:

  • LookupCache, defining the cache used in lookup table.
  • DefaultLookupCache a default implementation of a cache that suitable for most use cases.
  • CacheMetricGroup, defining metrics should be reported by the cache.

Partial and Full Caching

More specifically,  we'd like to provide public interfaces for the most 2 common cases to lookup source developers, which are named as partial and full caching.

Partial caching

Partial caching loads data into the cache along with the access to the external system. If the key to lookup does not exist in the cache, a lookup action to the external system will be triggered and the lookup result will be stored in the cache for further loading. Users and lookup table developers are able to config the eviction policy and maximum size of the cache.

In order to support partial caching, we propose to introduce 2 new interfaces:

...

  • PartialCachingLookupProvider / AsyncPartialCachingLookupProvider, as the API interacting with table source to get LookupFunction and LookupCacheFactoryLookupCache.

The cache serves as a component in LookupJoinRunner, and would be pluggable by specifying LookupCacheFactory in LookupFunctionProvider. The developer of a lookup table needs to define a LookupFunctionProvider / AsyncLookupProvider in their implementation of LookupTableSource to specify the LookupFunction and the factory of the cache, then the planner will take over the cache factory, the constructor of the provider. The planner will take over the lookup function and the cache created from the provider and pass it to the LookupJoinRunner, and the . The cache will be instantiated during the runtime execution and loading operations via lookup function if there's a cache miss.

Full Caching

If the size of lookup table is relatively small to fit into the memory, and the lookup table doesn't change frequently, it'll be more efficient to load all entries of the lookup table into the cache to reduce network I/O, and refresh the table periodically.  We'd like to name this use case as "full cache". Logically the reload operation is a kind of scan, so we'd like to reuse the ScanRuntimeProvider so that developers could reuse the scanning logic implemented in Source / SourceFunction / InputFormat. Considering the complexity of Source API, we'd like to support SourceFunction and InputFormat API first. Supporting Source API might require new topology and will be discussed later in another FLIP.

...

  • FullCachingLookupProvider, for reusing the ability of scanning.
  • FullCachingReloadTriggerCacheReloadTrigger, for customizing reloading strategies of all entries in the full cache.

Public Interfaces

Lookup Functions

  •  

Also As the usage of TableFunction interface is not quite straight forward to lookup table developers, we'd like to provide two default implementations of CacheReloadTrigger:

  • PeriodicCacheReloadTrigger, for triggering reload periodically with a specific interval
  • TimedCacheReloadTrigger, for triggering reload at the specific time and repeat with the interval in days.

Public Interfaces

Lookup Functions

As the usage of TableFunction interface is not quite straight forward to lookup table developers, we'd like to introduce a new interface introduce a new interface for sync and async lookup tables. Caching will be only supported on LookupFunction / AsyncLookupFunction.

...

Code Block
languagejava
titleAsyncLookupFunction
/**
 * A wrapper class of {@link AsyncTableFunction} for asynchronously lookup rows matching the lookup
 * keys from external system.
 *
 * <p>The output type of this table function is fixed as {@link RowData}.
 */
@PublicEvolving
public abstract class AsyncLookupFunction extends AsyncTableFunction<RowData> {

    /**
     * Asynchronously lookup rows matching the lookup keys.
     *
     * @param keyRow - A {@link RowData} that wraps keys to lookup.
     * @return A collections of all matching rows in the lookup table.
     */
    public abstract CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow);

    /** Invokes {@link #asyncLookup} and chains futures. */
    public final void eval(CompletableFuture<Collection<RowData>> future, Object... keys) {
         asyncLookup(GenericRowData.of(keys))
                .whenCompleteAsync(
                        (result, exception) -> {
                            if (exception != null) {
                                future.completeExceptionally(exception);
                                return;
                            }
                            future.complete(result);
                        });
    }
}

LookupCache

Considering there might be custom caching strategies and optimizations, we'd like to expose the cache interface as public API for developers to make the cache pluggable.

LookupFunctionProvider

Code Block
languagejava
titleLookupFunctionProvider
/** A provider for creating {@link LookupFunction}. */
@PublicEvolving
public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {

    static LookupFunctionProvider of(LookupFunction lookupFunction) {
        return () -> lookupFunction;
    }

    LookupFunction createLookupFunction();
}

AsyncLookupFunctionProvider

Code Block
languagejava
titleAsyncLookupFunctionProvider
/** A provider for creating {@link AsyncLookupFunction}. */
@PublicEvolving
public interface AsyncLookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {

    static AsyncLookupFunctionProvider of(AsyncLookupFunction asyncLookupFunction) {
        return () -> asyncLookupFunction;
    }

    AsyncLookupFunction createAsyncLookupFunction();
}

LookupCache

Considering there might be custom caching strategies and optimizations, we'd like to expose the cache interface as public API for developers to make the cache pluggable.

Code Block
languagejava
titleLookupCache
/**
 * A semi-persistent mapping from keys to values for storing entries of lookup table.
 *
 * <p>The type of the caching key is a {@link RowData} with lookup key fields packed inside. The
 * type of value is a {@link Collection} of {@link RowData}, which are rows matching lookup key
 * fields.
 *
 * <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either
 * evicted or manually invalidated.
 *
 * <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
 * by multiple concurrent threads.
 */
@PublicEvolving
public interface LookupCache extends AutoClosable, Serializable {

    
Code Block
languagejava
titlePartialCache
/**
 * A semi-persistent mapping from keys to values for storing entries of lookup table.
 *
 * <p>The type of the caching key is a {@link RowData} with lookup key fields packed inside. The
 * type of value is a {@link Collection} of {@link RowData}, which are rows matching lookup key
 * fields.
 *
 * <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either
 * evicted or manually invalidated.
 *
 * <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
 * by multiple concurrent threads.
 */
@PublicEvolving
public interface LookupCache extends AutoClosable {

    /**
     * Initialize the cache.
     *
     * @param metricGroup the metric group to register cache related metrics.
     */
    void open(CacheMetricGroup metricGroup);

    /**
     * Returns the value associated with key in this cache, or null if there is no cached value for
     * key.
     */
    @Nullable
    Collection<RowData> getIfPresent(RowData key);

    /**
     * Associates the specified value rows with the specified key row inInitialize the cache. If the cache
     *
 previously contained value associated with* the@param key,metricGroup the metric oldgroup valueto isregister replacedcache byrelated themetrics.
     */
 specified value.   void open(CacheMetricGroup metricGroup);

      /**
     * @returnReturns the previous value rows associated with key in this cache, or null if there wasis no mappingcached value for
     * key.
     */
 @param key - key@Nullable
 row with which theCollection<RowData> specified value is to be associatedgetIfPresent(RowData key);

    /**
     * @paramAssociates valuethe specified value rows to be associated with the specified key
 row in the cache. If */the cache
    Collection<RowData> put(RowData key, Collection<RowData> value);

    /** Discards any cached value for the specified key. */
    void invalidate(RowData key);

    /** Returns the number of key-value mappings in the cache. */
    long size();
}

DefaultLookupCache

As the cache should be instantiated during runtime execution to avoid serialization / deserialization, a factory is required for creating the cache. 

* previously contained value associated with the key, the old value is replaced by the
     * specified value.
      *
     * @return the previous value rows associated with key, or null if there was no mapping for key.
     * @param key - key row with which the specified value is to be associated
     * @param value – value rows to be associated with the specified key
     */
    Collection<RowData> put(RowData key, Collection<RowData> value);

    /** Discards any cached value for the specified key. */
    void invalidate(RowData key);

    /** Returns the number of key-value mappings in the cache. */
    long size();
}

DefaultLookupCache

As the cache should be instantiated during runtime execution to avoid serialization / deserialization, a factory is required for creating the cache. 

Code Block
languagejava
titleDefaultLookupCache
/** Default implementation of {@link LookupCache}. */
@PublicEvolving
public class DefaultLookupCache implements LookupCache {
    private final Duration expireAfterAccessDuration;
    private final Duration expireAfterWriteDuration;
    private final Long maximumSize;
    private final boolean cacheMissingKey;
    
    private DefaultLookupCache(
            Duration expireAfterAccessDuration,
   
Code Block
languagejava
titleDefaultPartialCache
/** Default implementation of {@link LookupCache}. */
@PublicEvolving
public class DefaultLookupCache implements LookupCache {
    private final Duration expireAfterAccessDuration;
    private final Duration expireAfterWriteDuration;
    private final Long maximumSize;
    private final boolean cacheMissingKey;
    
    private DefaultLookupCache(
            Duration expireAfterAccessDuration,
            Duration expireAfterWriteDuration,
            Long maximumSize,
			boolean cacheMissingKey) {
        this.expireAfterAccessDuration = expireAfterAccessDuration;
        this.expireAfterWriteDuration = expireAfterWriteDuration;
        this.initialCapacity = initialCapacity;
        this.maximumSize = maximumSize;
		this.cacheMissingKey = cacheMissingKey;
    }
    
    public static Builder newBuilder() {
        return new Builder();
    } 

   public static class Builder {         
        private Duration expireAfterAccessDuration;
        private Duration expireAfterWriteDuration;,
        private    Long maximumSize;,
			boolean cacheMissingKey) {
      private Boolean cacheMissingKey;

   this.expireAfterAccessDuration = expireAfterAccessDuration;
      public Builder expireAfterAccess(Duration duration) { this.expireAfterWriteDuration = expireAfterWriteDuration;
            expireAfterAccessDuration this.initialCapacity = durationinitialCapacity;
        this.maximumSize = maximumSize;
		this.cacheMissingKey = return thiscacheMissingKey;
        }

    
    public  publicstatic Builder expireAfterWritenewBuilder(Duration duration) {
        return new Builder();
  expireAfterWriteDuration  =} duration;

   public static class Builder {          return this;

        private Duration expireAfterAccessDuration;
        private Duration expireAfterWriteDuration;
        private Long maximumSize;
        private Boolean }cacheMissingKey;

        public Builder maximumSizeexpireAfterAccess(longDuration maximumSizeduration) {
            this.maximumSizeexpireAfterAccessDuration = maximumSizeduration;
            return this;
        }

        public Builder cacheMissingKeyexpireAfterWrite(booleanDuration cacheMissingKeyduration) {
            this.cacheMissingKeyexpireAfterWriteDuration = cacheMissingKeyduration;
            return this;
        }          

        public DefaultLookupCacheBuilder buildmaximumSize(long maximumSize) {
            returnthis.maximumSize new= DefaultLookupCache(maximumSize;
            return this;
       expireAfterAccessDuration, }

        public Builder cacheMissingKey(boolean cacheMissingKey) {
        expireAfterWriteDuration,
    this.cacheMissingKey = cacheMissingKey;
              maximumSize,
					cacheMissingKey)return this;
        }          

     }     
}

CacheMetricGroup

An interface defining all cache related metric:

   public DefaultLookupCache build() {
            return new DefaultLookupCache(
                    expireAfterAccessDuration,
                    expireAfterWriteDuration,
                    maximumSize,
					cacheMissingKey);
        }
    }     
}

CacheMetricGroup

An interface defining all cache related metric:

Code Block
languagejava
titleCacheMetricGroup
/**
 * Pre-defined metrics for cache.
 *
 * <p>Please note that these methods should only be invoked once. Registering a metric with same
 * name for multiple times would lead to an undefined behavior.
 */
@PublicEvolving
public interface CacheMetricGroup extends MetricGroup {
Code Block
languagejava
titleLookupCacheMetricGroup
/**
 * Pre-defined metrics for cache.
 *
 * <p>Please note that these methods should only be invoked once. Registering a metric with same
 * name for multiple times would lead to an undefined behavior.
 */
@PublicEvolving
public interface CacheMetricGroup extends MetricGroup {
    /** The number of cache hits. */
    void hitCounter(Counter hitCounter);

    /** The number of cache misses. */
    void missCounter(Counter missCounter);

    /** The number of times to load data into cache from external system. */
    void loadCounter(Counter loadCounter);

    /** The number of load failures. */
    void numLoadFailuresCounter(Counter numLoadFailuresCounter);

    /** The time spent for the latest load operation. */
    void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);

    /** The number of recordscache in cachehits. */
    void numCachedRecordsGaugehitCounter(Gauge<Long>Counter numCachedRecordsGaugehitCounter);

    /** The number of bytes used by cachecache misses. */
    void numCachedBytesGaugemissCounter(Gauge<Long>Counter numCachedBytesGauge);
}

PartialCachingLookupProvider

This is the API between table framework and user's table source. Implementation should define how to create a lookup function and whether to use cache. 

Code Block
languagejava
titleLookupFunctionProvider
//**
 * Provider for creating {@link LookupFunction} and {@link LookupCache} if caching should be enabled
 * for the lookup table.
 */
@PublicEvolving
public interface PartialCachingLookupProvider extends LookupTableSource.LookupRuntimeProvider {missCounter);

    /** The number of times to load data into cache from external system. */
    void loadCounter(Counter loadCounter);

    /** The number of load failures. */
    void numLoadFailuresCounter(Counter numLoadFailuresCounter);

    /** Creates a builder of {@link PartialCachingLookupProvider}The time spent for the latest load operation. */
    staticvoid Builder newBuilder() {latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);

    /** The number of return new Builder();records in cache. */
    }void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);

    /** The number Createsof anbytes {@linkused LookupFunction}by instancecache. */
    LookupFunctionvoid createLookupFunctionnumCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
}

PartialCachingLookupProvider

This is the API between table framework and user's table source. Implementation should define how to create a lookup function and whether to use cache. 

Code Block
languagejava
titlePartialCachingLookupProvider
);

    /**
 * Provider for creating *{@link GetsLookupFunction} theand instance of {@link LookupCache} for storing lookup entries.
 */
@PublicEvolving
public interface PartialCachingLookupProvider extends *LookupFunctionProvider {

    /**
 * <p>This cache will be* initializedBuild bya {@link LookupCache#openPartialCachingLookupProvider} during runtime executionfrom the specified {@link LookupFunction} and
     * used for optimizing the access to external lookup table{@link LookupCache}.
     */
    static * @return an {@link Optional} of {@link LookupCache}, or an empty {@link Optional} if cachingPartialCachingLookupProvider of(LookupFunction lookupFunction, LookupCache cache) {
        return new PartialCachingLookupProvider() {

     *     shouldn't be applies@Override
 to the lookup table.
     */
   public Optional<LookupCache>LookupCache getCache(); {

     /** Builder class for {@link PartialCachingLookupProvider}. */
    class Builderreturn {cache;

         private LookupFunction lookupFunction; }

        private LookupCache cache;
        private Boolean cacheMissingKey;

@Override
        /** Sets lookup function. */
        public BuilderLookupFunction withLookupFunctioncreateLookupFunction(LookupFunction lookupFunction) {
            this.lookupFunction =    return lookupFunction;
            return this;}
        };

    }

    /** EnablesGet cachinga andnew setsinstance theof cache{@link factoryLookupCache}. */
    LookupCache getCache();
}

PartialCachingAsyncLookupProvider

Code Block
languagejava
titlePartialCachingAsyncLookupProvider
/**
 * Provider for creating {@link AsyncLookupFunction} and {@link LookupCache} for storing lookup entries.
 */
@PublicEvolving
public interface PartialCachingAsyncLookupProvider extends AsyncLookupFunctionProvider {

    /**    public Builder withCache(LookupCache cache, boolean cacheMissingKey) {
            this.cache = cache;
     * Build a {@link PartialCachingLookupProvider} from the this.cacheMissingKey = cacheMissingKey;specified {@link AsyncLookupFunction} and
     * {@link LookupCache}.
     */
 return this;
  static PartialCachingLookupProvider of(AsyncLookupFunction asyncLookupFunction, LookupCache cache) }{

        publicreturn PartialCachingLookupProvidernew buildPartialCachingAsyncLookupProvider() {

           ... @Override
        }
    public LookupCache getCache() }
}

AsyncPartialCachingLookupProvider

Code Block
languagejava
titleAsyncLookupFunctionProvider
/**
 * Provider for creating {@link AsyncLookupFunction} and {@link LookupCache} if caching should be
 * enabled for the lookup table.
 */
@PublicEvolving
public interface AsyncPartialCachingLookupProvider extends LookupTableSource.LookupRuntimeProvider {{
                return cache;
            }

    /**  Creates a builder of {@link AsyncPartialCachingLookupProvider}. */@Override
       static AsyncPartialCachingLookupProvider.Builder newBuilder     public AsyncLookupFunction createAsyncLookupFunction() {
        return new AsyncPartialCachingLookupProvider.Builder();
    }

    /** Creates an {@link AsyncLookupFunction} instance. */
return asyncLookupFunction;
          AsyncLookupFunction createAsyncLookupFunction();

    /** }
     * Gets the {@link LookupCache};
 for creating lookup cache.}

     *
     /** <p>ThisGet factorya will be used for creating an new instance of cache during runtime execution for{@link LookupCache}. */
    LookupCache getCache();
}

FullCachingLookupProvider

This interface is for supporting full cache strategy. It reuses ScanRuntimeProvider and defines reload time. 

Code Block
languagejava
titleFullCachingLookupProvider
/**
 optimizing the access to external* A {@link CachingLookupProvider} that never lookup table.
in external system on cache *
miss and provides a
  * @returncache anfor {@linkholding Optional}all ofentries {@link LookupCache}, or an empty {@link Optional} if caching
     *     shouldn't be applies to the lookup table.
     */
    Optional<LookupCache> getCache();

    /** Builder class for {@link AsyncPartialCachingLookupProvider}. */
    class Builder {

in the external system. The cache will be fully reloaded from the
 * external system by the {@link ScanTableSource.ScanRuntimeProvider} and reload operations will be
 * triggered by the {@link CacheReloadTrigger}.
 */
@PublicEvolving
public interface FullCachingLookupProvider extends LookupFunctionProvider {
    static FullCachingLookupProvider of(
            ScanTableSource.ScanRuntimeProvider scanRuntimeProvider,
            CacheReloadTrigger cacheReloadTrigger) private{
 AsyncLookupFunction asyncLookupFunction;
      return  private LookupCache cache;
new FullCachingLookupProvider() {
          private Boolean cacheMissingKey;@Override

          /** Sets lookuppublic functionScanTableSource. */ScanRuntimeProvider getScanRuntimeProvider() {
        public AsyncPartialCachingLookupProvider.Builder withLookupFunction(
      return scanRuntimeProvider;
         AsyncLookupFunction  asyncLookupFunction) {}

            this.asyncLookupFunction@Override
 = asyncLookupFunction;
          public CacheReloadTrigger return this;getCacheReloadTrigger() {
        }

         /** Enables caching and sets the cache factory. */
   return cacheReloadTrigger;
            }

     public AsyncPartialCachingLookupProvider.Builder withCache(
     @Override
           LookupCache cache,public booleanLookupFunction cacheMissingKeycreateLookupFunction() {
            this.cache = cache;
  return keyRow -> null;
       this.cacheMissingKey = cacheMissingKey;
    }
        return this};
    }

    }/**

     * Get a  public AsyncPartialCachingLookupProvider build() {{@link ScanTableSource.ScanRuntimeProvider} for scanning all entries from the external
     * lookup table and load into the cache...
     */
     }ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();

    }
}

FullCachingLookupProvider

/** Get a {@link CacheReloadTrigger} for triggering the reload operation. */
    CacheReloadTrigger getCacheReloadTrigger();
}

CacheReloadTrigger

A trigger defining custom logic for triggering full cache reloading.This interface is for supporting full cache strategy. It reuses ScanRuntimeProvider and defines reload time. 

Code Block
languagejava
titleFullCachingLookupProviderCacheReloadTrigger
/**
 *Customized Runtime providertrigger for fully loading and periodically reloading all entries of the lookup table and
entries *in storingfull the table locally for lookup.
 *
 * <p>Implementations should provide a {@link ScanTableSource.ScanRuntimeProvider} in order to reuse
 * the ability of scanning for loading all entries from the lookup table.
 */
@PublicEvolving
public interface FullCachingLookupProvider extends LookupTableSource.LookupRuntimeProvider {

    static FullCachingLookupProvider of(
            ScanTableSource.ScanRuntimeProvider scanRuntimeProvider,
            FullCachingReloadTrigger fullCachingReloadTrigger) {
caching mode. */
@PublicEvolving
public interface CachingReloadTrigger extends AutoCloseable, Serializable {

    /** Open the trigger. */
    void open(Context context) throws Exception;

    /**
     * Context of {@link CacheReloadTrigger} for getting information about times and
     * triggering reload.
     */
    interface Context {

        return new FullCachingLookupProvider() {/** Get current processing time. */
        long currentProcessingTime();

   @Override
     /** Get current watermark on the main public ScanTableSourcestream.ScanRuntimeProvider getScanRuntimeProvider() { */
        long currentWatermark();

       return scanRuntimeProvider;
            }

 /** Trigger a reload operation on the full cache. */
        CompletableFuture<Void> triggerReload();
    }
}

PeriodicCacheReloadTrigger

An implementation of FullCachingReloadTrigger that triggers reload with a specified interval.

Code Block
languagejava
titlePeriodicCacheReloadTrigger
/** A @Override
trigger that reloads all entries periodically with specified interval or delay.  */
public FullCachingReloadTriggerclass getReloadTrigger() PeriodicCacheReloadTrigger implements CacheReloadTrigger {

    private final Duration reloadInterval;
    private final ScheduleMode scheduleMode;

     return fullCachingReloadTrigger;public PeriodicCacheReloadTrigger(Duration reloadInterval, ScheduleMode scheduleMode) {
        this.reloadInterval = reloadInterval;
  }
      this.scheduleMode = }scheduleMode;
    }

    /**@Override
    public * Gets the {@link ScanTableSource.ScanRuntimeProvider} for executing the periodically reload.
     */void open(CacheReloadTrigger.Context context) {
        // Register periodic reload task
    }

    @Override
    ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();

public void close() throws Exception {
        /** Get the {@link FullCachingReloadTrigger} for triggering a full caching reload operation. */// Dispose resources
    }

    public enum ScheduleMode {
        FIXED_DELAY,
    FullCachingReloadTrigger getReloadTrigger();
}

FullCachingReloadTrigger

A trigger defining custom logic for triggering full cache reloading.

    FIXED_RATE
    }
}

TimedCacheReloadTrigger

Code Block
languagejava
titleFullCachingReloadTriggerTimedCacheReloadTrigger
/** CustomizedA trigger for reloading all lookup table entries in full caching modethat reloads at a specific local time and repeat for the given interval in days. */ 
public interfaceclass FullCachingReloadTriggerTimedCacheReloadTrigger extendsimplements AutoCloseable,CacheReloadTrigger Serializable {

    /**private Openfinal the trigger. */LocalTime reloadTime;
    voidprivate open(Context context) throws Exceptionfinal int reloadIntervalInDays;

    /**
     * Context of {@link FullCachingReloadTrigger} for getting information about times andpublic TimedCacheReloadTrigger(LocalTime reloadTime, int reloadIntervalInDays) {
        this.reloadTime = reloadTime;
     *  triggering reload.this.reloadIntervalInDays = reloadIntervalInDays;
     */
}

    @Override
    public interfacevoid open(Context context) {

		// Register periodic reload task
    /** Get current processing time. */}

    @Override
    public void   long currentProcessingTimeclose();

 throws       /** Get current watermark on the main stream. */Exception {
		// Dispose resources
        long currentWatermark();

        /** Trigger a reload operation on the full cache. */
        CompletableFuture<Void> triggerReload();
    }
}

PeriodicFullCachingReloadTrigger

An implementation of FullCachingReloadTrigger that triggers reload with a specified interval.

Code Block
/** A trigger that reloads all entries periodically with specified interval or delay. */
public class PeriodicFullCachingReloadTrigger implements FullCachingReloadTrigger {

    private final Duration reloadInterval;
    private final ScheduleMode scheduleMode;

    private ScheduledExecutorService scheduledExecutor;

    public PeriodicFullCachingReloadTrigger(Duration reloadInterval, ScheduleMode scheduleMode) {
        this.reloadInterval = reloadInterval;
        this.scheduleMode = scheduleMode;
    }

    @Override
    public void open(FullCachingReloadTrigger.Context context) {
        scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        switch (scheduleMode) {
            case FIXED_RATE:
                scheduledExecutor.scheduleAtFixedRate(
                        context::triggerReload,
                        0,
                        reloadInterval.toMillis(),
                        TimeUnit.MILLISECONDS);
                break;
            case FIXED_DELAY:
                scheduledExecutor.scheduleWithFixedDelay(
                        () -> {
                            try {
                                context.triggerReload().get();
                            } catch (Exception e) {
                                throw new RuntimeException(
                                        "Uncaught exception during the reload", e);
                            }
                        },
                        0,
                        reloadInterval.toMillis(),
                        TimeUnit.MILLISECONDS);
                break;
            default:
                throw new IllegalArgumentException(
                        String.format("Unrecognized schedule mode \"%s\"", scheduleMode));
        }
    }

    @Override
    public void close() throws Exception {
        if (scheduledExecutor != null) {
            scheduledExecutor.shutdownNow();
        }
    }

    public enum ScheduleMode {
        FIXED_DELAY,
        FIXED_RATE
    }
}

TableFunctionProvider / AsyncTableFunctionProvider

We'd like to deprecate these two interfaces and let developers to switch to the new LookupFunctionProvider / AsyncLookupFunctionProvider / FullCachingLookupProvider instead.

Table Options for Lookup Cache

In order to unify the usage of caching across all connectors, we'd like to introduce some common table options, which are defined under class LookupOptions. Note that these options are not required to implement by all connectors. 

}
}

TableFunctionProvider / AsyncTableFunctionProvider

We'd like to deprecate these two interfaces and let developers to switch to the new LookupFunctionProvider / AsyncLookupFunctionProvider / FullCachingLookupProvider instead.

Table Options for Lookup Cache

In order to unify the usage of caching across all connectors, we'd like to introduce some common table options, which are defined under class LookupOptions. Note that these options are not required to implement by all connectors. 

OptionTypeDescriptions
lookup.cacheEnum of NONE, PARTIAL and FULL

The caching strategy for this lookup table.

NONE: Do not use cache

Partial: Use partial caching mode

FULL: Use full caching mode

lookup.max-retriesIntegerThe maximum allowed retries if a lookup operation fails
lookup.partial-cache.expire-after-accessDurationDuration to expire an entry in the cache after accessing
lookup.partial-cache.expire-after-writeDurationDuration to expire an entry in the cache after writing
lookup.partial-cache.cache-missing-keyBooleanWhether to store an empty value into the cache if the lookup key doesn't match any rows in the table
lookup.partial-cache.max-rowsLongThe maximum number of rows to store in the cache
lookup.full-cache.reload-strategyEnum of PERIODIC and TIMED

The reload strategy for the full cache scenario.

PERIODIC: Use PeriodicCacheReloadTrigger

TIMED: Use TimedCacheReloadTrigger

lookup.full-cache.periodic-reload.intervalDurationDuration to trigger reload in the PeriodicCacheReloadTrigger
lookup.full-cache.periodic-reload.schedule-modeEnum of FIXED_DELAY and FIXED_RATEThe periodically schedule mode of reloading in the PeriodicCacheReloadTrigger
lookup.full-cache.timed-reload.iso-timeString

Time in ISO-8601 format when cache needs to be reloaded. Time can be specified either with timezone or without timezone (target JVM local timezone will be used). See formatter ISO_TIME.

lookup.full-cache.timed-reload.interval-in-daysInteger

The interval in days to trigger the reload at the specified time

OptionTypeDescriptionslookup.cacheEnum of NONE, PARTIAL and FULL

The caching strategy for this lookup table.

NONE: Do not use cache

Partial: Use partial caching mode

FULL: Use full caching mode

lookup.max-retriesIntegerThe maximum allowed retries if a lookup operation failslookup.partial-cache.expire-after-accessDurationDuration to expire an entry in the cache after accessinglookup.partial-cache.expire-after-writeDurationDuration to expire an entry in the cache after writinglookup.partial-cache.cache-missing-keyBooleanWhether to store an empty value into the cache if the lookup key doesn't match any rows in the tablelookup.partial-cache.max-rowsLongThe maximum number of rows to store in the cachelookup.full-cache.reload-intervalDurationInterval of reloading all entries from the lookup table into cachelookup.full-cache.reload-schedule-modeEnum of FIXED_DELAY and FIXED_RATEThe periodically schedule mode of reloading. 

Cache Metrics

It is important to mention that a cache implementation does not have to report all the defined metrics. But if a cache reports a metric of the same semantic defined below, the implementation should follow the convention.

...