...
It is important to mention that a cache implementation does not have to report all the defined metrics. But if a cache reports a metric of the same semantic defined below, the implementation should follow the convention.
Name | Type | Unit | Description |
---|---|---|---|
numCachedRecord | Gauge | Records | The number of records in cache. |
numCachedBytes | Gauge | Bytes | The number of bytes used by cache. |
hitCount | Counter | The number of cache hits | |
missCount | Counter | The number of cache misses, which might leads to loading operations | |
loadCount | Counter | The number of times to load data into cache from external system. For LRU cache the load count should be equal to miss count, but for all cache this would be different. | |
numLoadFailure | Counter | The number of load failures | |
latestLoadTime | Gauge | ms | The time spent for the latest load operation |
Here we just define fundamental metrics and let the external metric system make the aggregation to get more descriptive values such as hitRate = hitCount / (hitCount + missCount).
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * A semi-persistent mapping from keys to values. Cache entries are manually added using {@link * #put(Object, Object)}, and are stored in the cache until either evicted or manually invalidated. * * <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed * by multiple concurrent threads. * * @param <K> key * @param <V> value */ public interface Cache<K, V> extends AutoCloseable { /** * Initialize the cache. * * @param cacheMetricGroup - metric group for reporting metrics of the cache */ void open(CacheMetricGroup cacheMetricGroup); /** * Returns the value associated with the key in thethis cache, or obtainnull if thethere valueis byno loadercached ifvalue thefor * key doesn't exist. */ * @paramV getIfPresent(K key); key - the key whose associated value is to be returned * @param loader - the loader for loading associated values from external system /** * Returns the value associated with the key in the cache, or obtain the value by loader if the * @return The value associated with the keykey doesn't exist. * * @throws IOException if any exception was thrown while loading the value@param key key - the key whose associated value is to be returned */ @param loader - Vthe get(K key, Callable<? extends V> loader) throws IOException; /**loader for loading associated values from external system * Associates@return theThe specifiedvalue valueassociated with the specified key in the cache. If the* cache@throws previously IOException if any exception was *thrown containedwhile aloading the value associated with the key, the old value is replaced by the specified value. */ V get(K key, Callable<? extends V> loader) throws IOException; /** * @returnAssociates the previousspecified value with associatedthe withspecified key, orin nullthe ifcache. thereIf wasthe no mapping for key.cache previously * @paramcontained keya -value keyassociated with the whichkey, the specifiedold value is to be associatedreplaced by the specified value. * @param value* –@return valuethe toprevious bevalue associated with key, or thenull specifiedif key there was no mapping for */key. V put(K * @param key, - V value); /** * Copies all the mappings from the specified map to the cache. The effect of this call iskey with which the specified value is to be associated * @param value – value to be associated with the specified key */ equivalent to that ofV calling {@code put(kK key, v)} on this map once for each mapping from keyV value); /** * {@codeCopies k}all tothe value {@code v} inmappings from the specified map to the cache. The behavioreffect of this operationcall is * undefinedequivalent ifto thethat specifiedof mapcalling is modified while the operation is in progress. */{@code put(k, v)} on this map once for each mapping from key void putAll(Map<? extends K, ? extends V> m); /** Delete all entries in the cache. */ void clean(); /** Returns the number of key-value mappings in the cache.* {@code k} to value {@code v} in the specified map. The behavior of this operation is * undefined if the specified map is modified while the operation is in progress. */ longvoid size(); } |
CacheMetricGroup
An interface defining all cache related metric:
Code Block | ||||
---|---|---|---|---|
| ||||
/** Pre-defined metrics for {@code Cache}. */ public interface CacheMetricGroup {putAll(Map<? extends K, ? extends V> m); /** The number ofDelete all entries in the cache hits. */ Countervoid getHitCounterclean(); /** TheReturns the number of key-value mappings in the cache misses. */ Counterlong getMissCountersize(); } |
CacheMetricGroup
An interface defining all cache related metric:
Code Block | ||||
---|---|---|---|---|
| ||||
/** ThePre-defined numbermetrics offor times to load data into cache from external system{@code Cache}. */ public interface CacheMetricGroup { /** The number of cache hits. */ Counter getLoadCountergetHitCounter(); /** The timenumber spentof for the latest load operationcache misses. */ Gauge<Long>Counter getLatestLoadTimeGaugegetMissCounter(); /** The number of times to load failuresdata into cache from external system. */ Counter getNumLoadFailureCountergetLoadCounter(); /** The number of records in cacheload failures. */ Counter getNumLoadFailureCounter(); /** The time spent for the latest load operation. */ void setNumCachedRecordsGaugesetLatestLoadTimeGauge(Gauge<Long> numCachedRecordsGaugelatestLoadTimeGauge); /** The number of bytesrecords used byin cache. */ void setNumCachedBytesGaugesetNumCachedRecordsGauge(Gauge<Long> numCachedBytesGaugenumCachedRecordsGauge); } /** The number of bytes used by cache. */ void setNumCachedBytesGauge(Gauge<Long> numCachedBytesGauge); } |
CacheConfigCacheConfig
A wrapper class containing parameters of the cache:
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Base class for lookup function that supports caching. */ @PublicEvolving public *abstract @paramclass <K>CachingTableFunction -extends theTableFunction<RowData> type{ of keys of entriesprivate infinal theCache<RowData, List<RowData>> cache; /** @param <V> - the type* ofUse valuesdefault ofcache entriesimplementation inwith thespecified cache configuration. * * @param <T>cacheConfig - the typeconfiguration of the outputdefault rowcache. */ @PublicEvolving public class CachingTableFunction<K, V, T>public extends TableFunction<T>CachingTableFunction(CacheConfig cacheConfig) { private final Cache<K, V> cache = new DefaultCache<>(cacheConfig); } /** * Use defaultcustom cache implementation with specified cache configuration. * * @param cacheConfigcache - configurationcustom of the default cache.cache implementation */ public CachingTableFunction(CacheConfig cacheConfigCache<RowData, List<RowData>> cache) { this.cache = new DefaultCache<>(cacheConfig)cache; } /**@Override public * Use custom cache implementation.void open(FunctionContext context) throws Exception { * cache.open(new * @param cache - custom cache implementation */ public CachingTableFunction(Cache<K, V> cache) { this.cache = cache; } @Override public void open(FunctionContext context) throws Exception { cache.open(new InternalCacheMetricGroup(context.getMetricGroup())); } /** Get the cache instance. */ protected Cache<K, V> getCache() { return cache; } } InternalCacheMetricGroup(context.getMetricGroup())); } public void eval(Object... keys){ // Check the cache first, or load via lookup() if the cache doesn't contains the key } /** * Lookup rows associated with the key row from external system. * * @param key the key row containing lookup fields * @throws IOException if any exception was thrown while looking up the value */ public abstract List<RowData> lookup(RowData key) throws IOException; } |
AllCachingTableFunction
A helper TableFunction that load all entries into the cache and schedules reloading by the specified TTL.
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Base class for lookup function that loads all entries from external system and schedules * reloading by the specified TTL. * * @param <K> - the type of keys of entries in the cache * @param <V> - the type of values of entries in the cache * @param <T> - the type of the output row */ public abstract class AllCachingTableFunction<K, V, T> extends CachingTableFunction<K, V, T> { // Interval to reload all entries in cache private final Duration reloadInterval; public AllCachingTableFunction(Duration reloadInterval) { super(new AllCache<>()); lookup function that loads all entries from external system and schedules * reloading by the specified TTL. */ public abstract class AllCachingTableFunction extends TableFunction<RowData> { private Cache<RowData, List<RowData>> cache; // Interval to reload all entries in cache private final Duration reloadInterval; public AllCachingTableFunction(CacheConfig cacheConfig) { cache = new DefaultCache(cacheConfig); reloadInterval = cacheConfig.get(TTL); } public AllCachingTableFunction(Cache<RowData, List<RowData>> cache, Duration reloadInterval) { this.cache = cache; this.reloadInterval = reloadInterval; } @Override public void open(FunctionContext context) throws Exception { super.open(context); scheduleCacheReload(this::reloadAllEntries, reloadInterval); } public void eval(Object... keys) { // Only check the cache here } /** * Reload all entries in the cache. * * <p>This function will be invoked periodically in a scheduled executor, which invalidates all * entries in the cache and reloads all entries from external system. * * @param cache - the cache in which all entries to be reloaded * @throws IOException - if any exception was thrown while reloading the value */ public abstract void reloadAllEntries(Cache<RowData, List<RowData>> cache) throws IOException; } |
CachingAsyncTableFunction
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Base class for asynchronous lookup function that supports caching. */ @PublicEvolving public abstract class CachingAsyncTableFunction extends AsyncTableFunction<RowData> { private final Cache<RowData, List<RowData>> cache; /** * Use default cache implementation with specified cache configuration. * * @param cacheConfig - configuration of the default cache. */ public CachingTableFunction(CacheConfig cacheConfig) { cache = new DefaultCache<>(cacheConfig); } /** * Use custom cache implementation. * * @param cache - custom cache implementation */ public CachingTableFunction(Cache<RowData, List<RowData>> cache) { this.reloadIntervalcache = reloadIntervalcache; } @Override public void open(FunctionContext context) throws Exception { super.open(context); scheduleCacheReload(this::reloadAllEntries, reloadIntervalcache.open(new InternalCacheMetricGroup(context.getMetricGroup())); } public void eval(CompletableFuture<Collection<RowData>> result, Object... /** * Reload all entries in the cache. * * <p>This function will be invoked periodically in a scheduled executor, which invalidates allkeys){ // Check the cache first and complete the result directly if cache hit, // or load asynchronously via lookupAsync() if the cache doesn't contains the key } /** * entriesLookup inrows theassociated cachewith andthe reloadskey allrow entriesasynchronously from external system. and return *the * @param{@link cacheCompletableFuture} -of the cache in which all entries to be reloadedlookup result. * * @throws@param IOExceptionkey - if any exception was thrown while reloading the valuethe key row whose associated value rows is to be returned */ public public abstract voidCompletableFuture<List<RowData>> reloadAllEntrieslookupAsync(Cache<K, V> cache) throws IOExceptionRowData key); } |
Table Options
We propose to add new table options below for configuring lookup table and its underlying cache:
...