Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

It is important to mention that a cache implementation does not have to report all the defined metrics. But if a cache reports a metric of the same semantic defined below, the implementation should follow the convention.

NameTypeUnitDescription
numCachedRecordGaugeRecordsThe number of records in cache.
numCachedBytesGaugeBytesThe number of bytes used by cache.
hitCountCounter
The number of cache hits
missCountCounter
The number of cache misses, which might leads to loading operations
loadCountCounter

The number of times to load data into cache from external system.

For LRU cache the load count should be equal to miss count, but for all cache this would be different.

numLoadFailureCounter
The number of load failures
latestLoadTimeGaugemsThe time spent for the latest load operation

Here we just define fundamental metrics and let the external metric system make the aggregation to get more descriptive values such as hitRate = hitCount / (hitCount + missCount).

...

Code Block
languagejava
titleCache
/**
 * A semi-persistent mapping from keys to values. Cache entries are manually added using {@link
 * #put(Object, Object)}, and are stored in the cache until either evicted or manually invalidated.
 *
 * <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
 * by multiple concurrent threads.
 *
 * @param <K> key
 * @param <V> value
 */
public interface Cache<K, V> extends AutoCloseable {

    /**
     * Initialize the cache.
     *
     * @param cacheMetricGroup - metric group for reporting metrics of the cache
     */
    void open(CacheMetricGroup cacheMetricGroup);

    /**
     * Returns the value associated with the key in thethis cache, or obtainnull if thethere valueis byno loadercached ifvalue thefor
     * key doesn't exist.
     */
     * @paramV getIfPresent(K key);

 key - the key whose associated value is to be returned
     * @param loader - the loader for loading associated values from external system   /**
     * Returns the value associated with the key in the cache, or obtain the value by loader if the
     * @return The value associated with the keykey doesn't exist.
     *
     * @throws IOException if any exception was thrown while loading the value@param key key - the key whose associated value is to be returned
     */
 @param loader - Vthe get(K key, Callable<? extends V> loader) throws IOException;

    /**loader for loading associated values from external system
     * Associates@return theThe specifiedvalue valueassociated with the specified key
 in the cache. If the* cache@throws previously
IOException if any exception was *thrown containedwhile aloading the value associated with the key, the old value is replaced by the specified value.
     */
    V get(K key, Callable<? extends V> loader) throws IOException;

     /**
     * @returnAssociates the previousspecified value with associatedthe withspecified key, orin nullthe ifcache. thereIf wasthe no mapping for key.cache previously
     * @paramcontained keya -value keyassociated with the whichkey, the specifiedold value is to be associatedreplaced by the specified value.
     *
   @param  value* @return valuethe toprevious bevalue associated with key, or thenull specifiedif key
there was no mapping for */key.
    V put(K * @param key, - V value);

    /**
     * Copies all the mappings from the specified map to the cache. The effect of this call iskey with which the specified value is to be associated
     * @param value – value to be associated with the specified key
     */
 equivalent to that ofV calling {@code put(kK key, v)} on this map once for each mapping from keyV value);

    /**
     * {@codeCopies k}all tothe value {@code v} inmappings from the specified map to the cache. The behavioreffect of this operationcall is
     * undefinedequivalent ifto thethat specifiedof mapcalling is modified while the operation is in progress.
     */{@code put(k, v)} on this map once for each mapping from key
    void putAll(Map<? extends K, ? extends V> m);

    /** Delete all entries in the cache. */
    void clean();

    /** Returns the number of key-value mappings in the cache.* {@code k} to value {@code v} in the specified map. The behavior of this operation is
     * undefined if the specified map is modified while the operation is in progress.
     */
    longvoid size();
}

CacheMetricGroup

An interface defining all cache related metric:

Code Block
languagejava
titleCacheMetricGroup
/** Pre-defined metrics for {@code Cache}. */
public interface CacheMetricGroup {putAll(Map<? extends K, ? extends V> m);

    /** The number ofDelete all entries in the cache hits. */
    Countervoid getHitCounterclean();

    /** TheReturns the number of key-value mappings in the cache misses. */
    Counterlong getMissCountersize();
}

CacheMetricGroup

An interface defining all cache related metric:

Code Block
languagejava
titleCacheMetricGroup

    /** ThePre-defined numbermetrics offor times to load data into cache from external system{@code Cache}. */
public interface CacheMetricGroup {

    /** The number of cache hits. */
    Counter getLoadCountergetHitCounter();

    /** The timenumber spentof for the latest load operationcache misses. */
    Gauge<Long>Counter getLatestLoadTimeGaugegetMissCounter();

    /** The number of times to load failuresdata into cache from external system. */
    Counter getNumLoadFailureCountergetLoadCounter();

    /** The number of records in cacheload failures. */
    Counter getNumLoadFailureCounter();
 
    /** The time spent for the latest load operation. */
    void setNumCachedRecordsGaugesetLatestLoadTimeGauge(Gauge<Long> numCachedRecordsGaugelatestLoadTimeGauge); 

      /** The number of bytesrecords used byin cache. */
    void setNumCachedBytesGaugesetNumCachedRecordsGauge(Gauge<Long> numCachedBytesGaugenumCachedRecordsGauge);
}

    /** The number of bytes used by cache. */
    void setNumCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
}

CacheConfigCacheConfig

A wrapper class containing parameters of the cache:

...

Code Block
languagejava
titleCachingTableFunction
/**
 * Base class for lookup function that supports caching.
 */
@PublicEvolving
public *abstract @paramclass <K>CachingTableFunction -extends theTableFunction<RowData> type{
 of keys of entriesprivate infinal theCache<RowData, List<RowData>> cache;

    /**
 @param <V> - the type* ofUse valuesdefault ofcache entriesimplementation inwith thespecified cache configuration.
     *
     * @param <T>cacheConfig - the typeconfiguration of the outputdefault rowcache.
     */
@PublicEvolving
public class CachingTableFunction<K, V, T>public extends TableFunction<T>CachingTableFunction(CacheConfig cacheConfig) {
    private final Cache<K, V> cache = new DefaultCache<>(cacheConfig);
    }

    /**
     * Use defaultcustom cache implementation with specified cache configuration.
     *
     * @param cacheConfigcache - configurationcustom of the default cache.cache implementation
     */
    public CachingTableFunction(CacheConfig cacheConfigCache<RowData, List<RowData>> cache) {
        this.cache = new DefaultCache<>(cacheConfig)cache;
    }

    /**@Override
    public * Use custom cache implementation.void open(FunctionContext context) throws Exception {
     *
   cache.open(new  * @param cache - custom cache implementation
     */
    public CachingTableFunction(Cache<K, V> cache) {
        this.cache = cache;
    }

    @Override
    public void open(FunctionContext context) throws Exception {
        cache.open(new InternalCacheMetricGroup(context.getMetricGroup()));
    }

    /** Get the cache instance. */
    protected Cache<K, V> getCache() {
        return cache;
    }
}

InternalCacheMetricGroup(context.getMetricGroup()));
    }

	public void eval(Object... keys){
		// Check the cache first, or load via lookup() if the cache doesn't contains the key
    }
   
    /**
     * Lookup rows associated with the key row from external system.
     *
     * @param key the key row containing lookup fields
     * @throws IOException if any exception was thrown while looking up the value
     */
    public abstract List<RowData> lookup(RowData key) throws IOException; 
}

AllCachingTableFunction

A helper TableFunction that load all entries into the cache and schedules reloading by the specified TTL. 

Code Block
languagejava
titleAllCachingTableFunction
/**
 * Base class for lookup function that loads all entries from external system and schedules
 * reloading by the specified TTL.
 *
 * @param <K> - the type of keys of entries in the cache
 * @param <V> - the type of values of entries in the cache
 * @param <T> - the type of the output row
 */
public abstract class AllCachingTableFunction<K, V, T> extends CachingTableFunction<K, V, T> {

    // Interval to reload all entries in cache
    private final Duration reloadInterval;

    public AllCachingTableFunction(Duration reloadInterval) {
        super(new AllCache<>()); lookup function that loads all entries from external system and schedules
 * reloading by the specified TTL.
 */
public abstract class AllCachingTableFunction extends TableFunction<RowData> {
 	private Cache<RowData, List<RowData>> cache;

    // Interval to reload all entries in cache
    private final Duration reloadInterval;

	public AllCachingTableFunction(CacheConfig cacheConfig) {
		cache = new DefaultCache(cacheConfig);
		reloadInterval = cacheConfig.get(TTL);
	}

    public AllCachingTableFunction(Cache<RowData, List<RowData>> cache, Duration reloadInterval) {
        this.cache = cache;
        this.reloadInterval = reloadInterval;
    }

    @Override
    public void open(FunctionContext context) throws Exception {
        super.open(context);
        scheduleCacheReload(this::reloadAllEntries, reloadInterval);
    }

    public void eval(Object... keys) {
		// Only check the cache here
	}

    /**
     * Reload all entries in the cache.
     *
     * <p>This function will be invoked periodically in a scheduled executor, which invalidates all
     * entries in the cache and reloads all entries from external system.
     *
     * @param cache - the cache in which all entries to be reloaded
     * @throws IOException - if any exception was thrown while reloading the value
     */
    public abstract void reloadAllEntries(Cache<RowData, List<RowData>> cache) throws IOException;
}

CachingAsyncTableFunction

Code Block
languagejava
titleCachingTableFunction
/**
 * Base class for asynchronous lookup function that supports caching.
 */
@PublicEvolving
public abstract class CachingAsyncTableFunction extends AsyncTableFunction<RowData> {
    private final Cache<RowData, List<RowData>> cache;

    /**
     * Use default cache implementation with specified cache configuration.
     *
     * @param cacheConfig - configuration of the default cache.
     */
    public CachingTableFunction(CacheConfig cacheConfig) {
        cache = new DefaultCache<>(cacheConfig);
    }

    /**
     * Use custom cache implementation.
     *
     * @param cache - custom cache implementation
     */
    public CachingTableFunction(Cache<RowData, List<RowData>> cache) {
        this.reloadIntervalcache = reloadIntervalcache;
    }

    @Override
    public void open(FunctionContext context) throws Exception {
        super.open(context);
        scheduleCacheReload(this::reloadAllEntries, reloadIntervalcache.open(new InternalCacheMetricGroup(context.getMetricGroup()));
    }

	public void eval(CompletableFuture<Collection<RowData>> result, Object... /**
     * Reload all entries in the cache.
     *
     * <p>This function will be invoked periodically in a scheduled executor, which invalidates allkeys){
		// Check the cache first and complete the result directly if cache hit, 
		// or load asynchronously via lookupAsync() if the cache doesn't contains the key
    }
   
    /**
     * entriesLookup inrows theassociated cachewith andthe reloadskey allrow entriesasynchronously from external system.
 and return   *the
     * @param{@link cacheCompletableFuture} -of the cache in which all entries to be reloadedlookup result.
     *
     * @throws@param IOExceptionkey - if any exception was thrown while reloading the valuethe key row whose associated value rows is to be returned
     */
     public public abstract voidCompletableFuture<List<RowData>> reloadAllEntrieslookupAsync(Cache<K, V> cache) throws IOExceptionRowData key);
}

Table Options

We propose to add new table options below for configuring lookup table and its underlying cache:

...