Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Proposed Changes

We propose a new public interface for cache and a set of conventional / standard metrics. Most of designs are inspired by Google GuavaCache. Also we'd like to introduce new table options for configuring lookup table caching via Table/SQL API.

Cache Metrics

It is important to mention that a cache implementation does not have to report all the defined metrics. But if a cache reports a metric of the same semantic defined below, the implementation should follow the convention.

...

The number of times to load data into cache from external system.

For LRU cache the load count should be equal to miss count, but for all cache this would be different.

...

several new interfaces to simplify the work for developers to implement lookup table functions and enable cache as optimization:

  • LookupFunction / AsyncLookupFunction, an extended version of TableFunction to clarify the semantic of lookup.
  • LookupCache / LookupCacheFactory, defining the cache and its factory used in lookup table.
  • DefaultLookupCacheFactory, a default implementation of cache that suitable for most use cases.
  • LookupCacheMetricGroup, defining metrics should be reported by the lookup cache.
  • LookupFunctionProvider / AsyncLookupFunctionProvider, as the API interacting with table source to get LookupFunction and LookupCacheFactory.

The cache serves as a component in LookupJoinRunner, and would be pluggable by specifying LookupCacheFactory in LookupFunctionProvider. The developer of a lookup table needs to define a LookupFunctionProvider / AsyncLookupProvider in their implementation of LookupTableSource to specify the LookupFunction and the factory of the cache, then the planner will take over the cache factory, pass it to the LookupJoinRunner, and the cache will be instantiated during the runtime execution.

Public Interfaces

Lookup Functions

As the usage of TableFunction interface is not quite straight forward to lookup table developers, we'd like to introduce a new interface for sync and async lookup tables. Caching will be only supported on LookupFunction / AsyncLookupFunction.

Code Block
languagejava
titleLookupFunction
/**
 * A wrapper class of {@link TableFunction} for synchronously lookup rows matching the lookup keys
 * from external system.
 *
 * <p>The output type of this table function is fixed as {@link RowData}.
 */
@PublicEvolving
public abstract class LookupFunction extends TableFunction<RowData> {

    /**
     * Synchronously lookup rows matching the lookup keys.
     *
     * @param keys - Keys to lookup.
     * @return A collections of all matching rows in the lookup table.
     */
    public abstract Collection<RowData> lookup(Object... keys) throws IOException;

Here we just define fundamental metrics and let the external metric system make the aggregation to get more descriptive values such as hitRate = hitCount / (hitCount + missCount).

Scope

The metric group for the cache would be a sub-group of the OperatorMetricGroup where the table function belongs to.

Public Interfaces

We propose to add new interfaces that defines abstraction of cache and related helper classes to build TableFunction with cache easily:

  • Cache, a base interface for all caches
  • CacheMetricGroup, a interface defining cache related metrics
  • CacheConfig, a wrapper class containing configurations of the cache
  • CachingTableFunction, a helper TableFunction class with cache
  • AllCachingTableFunction, a helper TableFunction class for lookup sources that loads all entries into the cache on start and reload at fixed rate

Lookup Functions

As the usage of TableFunction interface is not quite straight forward to lookup table developers, we'd like to introduce a new interface for lookup tables.

Code Block
languagejava
titleLookupFunction
/**
 * A wrapper class of {@link TableFunction} for synchronously lookup rows matching the lookup keys
 * from external system.
 *
 * <p>The output type of this table function is fixed as {@link RowData}.
 */
@PublicEvolving
public abstract class LookupFunction extends TableFunction<RowData> {

    /**
 Invoke {@link #lookup} and handle exceptions. */
 Synchronously  lookup rowspublic matchingfinal the lookup keys.void eval(Object... keys) {
     *   try {
     *  @param keys - Keys to lookup(keys).forEach(this::collect);
     * @return A collections} ofcatch all(IOException matchinge) rows{
 in the lookup table.
     */
   throw public abstract Collection<RowData> lookup(Object... keys) throws IOException;

new RuntimeException("Failed to lookup values with given key", e);
     /** Invoke {@link #lookup}
 and handle exceptions. */
    public final void eval(Object... keys) {
        try {
            lookup( }
}


Code Block
languagejava
titleAsyncLookupFunction
/**
 * A wrapper class of {@link AsyncTableFunction} for asynchronously lookup rows matching the lookup
 * keys from external system.
 *
 * <p>The output type of this table function is fixed as {@link RowData}.
 */
@PublicEvolving
public abstract class AsyncLookupFunction extends AsyncTableFunction<RowData> {

    /**keys).forEach(this::collect);
        } catch (IOException e) {
     * Asynchronously lookup rows matching the lookup throwkeys.
 new RuntimeException("Failed to lookup values*
 with given key", e);
 * @param keys - Keys to lookup.
 }
    }
}

Cache

A base interface for all caches:

Code Block
languagejava
titleCache
/**
 *@return A semi-persistentcollections mapping from keys to values. Cache entries are manually added using {@link
 * #put(Object, Object)}, and are stored in the cache until either evicted or manually invalidated.
 *
 * <p>Implementations of this interfaceof all matching rows in the lookup table.
     */
    public abstract CompletableFuture<Collection<RowData>> asyncLookup(Object... keys);

    /** Invokes {@link #asyncLookup} and chains futures. */
    public final void eval(CompletableFuture<Collection<RowData>> future, Object... keys) {
        asyncLookup(keys)
                .whenCompleteAsync(
                        (result, exception) -> {
                            if (exception != null) {
                                future.completeExceptionally(exception);
                                return;
                            }
                            future.complete(result);
                        });
    }
}

LookupCache

Considering there might be custom caching strategies and optimizations, we'd like to expose the cache interface as public API for developers to make the cache pluggable.

Code Block
languagejava
titleLookupCache
/**
 * A semi-persistent mapping from keys to values for storing entries of lookup table.
 *
 * <p>The type of the caching key is a {@link RowData} with lookup key fields packed inside. The
 * type of value is a {@link Collection} of {@link RowData}, which are rows matching lookup key
 * fields.
 *
 * <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either
 * evicted or manually invalidated.
 *
 * <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
 * by multiple concurrent threads.
 */
@PublicEvolving
public interface LookupCache { are expected to be thread-safe, and can be safely accessed
 * by multiple concurrent threads.
 *
 * @param <K> key
 * @param <V> value
 */
public interface Cache<K, V> extends AutoCloseable {

    /**
     * Initialize the cache.
     *
     * @param cacheMetricGroup - metric group for reporting metrics of the cache
     */
    void open(CacheMetricGroup cacheMetricGroup);

    /**
     * Returns the value associated with key in this cache, or null if there is no cached value for
     * key.
     */
    V getIfPresent(K key);

    /**
     * Returns the value associated with the key in the cache, or obtain the value by loader if the
     * key doesn't exist.
     *
     * @param key key - the key whose associated value is to be returned
     * @param loader - the loader for loading associated values from external system
     * @return The value associated with the key
     * @throws IOException if any exception was thrown while loading the value
     */
    V get(K key, Callable<? extends V> loader) throws IOException;

    /**
     * Associates the specified value with the specified key in the cache. If the cache previously
     * contained a value associated with the key, the old value is replaced by the specified value.
     *
     * @return the previous value associated with key, or null if there was no mapping for key.
     * @param key - key with which the specified value is to be associated
     * @param value – value to be associated with the specified key
     */
    V put(K key, V value);

    /**
     * CopiesReturns all the mappingsvalue fromassociated thewith specifiedkey mapin tothis the cache., Theor effectnull ofif thisthere call is
 no cached value for
     * equivalentkey.
 to that of calling {@code put(k, v)} on this map once for each mapping from key*/
    @Nullable
    Collection<RowData> getIfPresent(RowData key);

     /**
 {@code k} to value {@code v} in* Associates the specified map.value Therows behaviorwith ofthe thisspecified operation iskey row in the cache. If the cache
     * undefinedpreviously ifcontained thevalue specifiedassociated mapwith isthe modifiedkey, whilethe theold operationvalue is in progress.replaced by the
     */ specified value.
      *
 void putAll(Map<? extends K, ?* extends@return V> m);

    /** Delete all entries in the cache. */
    void clean();

    /** Returns the number of key-value mappings in the cache. */
    long size();
}

CacheMetricGroup

An interface defining all cache related metric:

Code Block
languagejava
titleCacheMetricGroup
/** Pre-defined metrics for {@code Cache}. */
public interface CacheMetricGroup {the previous value rows associated with key, or null if there was no mapping for key.
     * @param key - key row with which the specified value is to be associated
     * @param value – value rows to be associated with the specified key
     */
    Collection<RowData> put(RowData key, Collection<RowData> value);

    /**
 The number of cache hits. */
 Copies all the Counter getHitCounter();

    /** The number of cache misses. */
    Counter getMissCounter();

mappings from the specified map to the cache. The effect of this call is
     /** equivalent Theto numberthat of timescalling to load data into cache from external system. */
{@code put(k, v)} on this map once for each mapping from key
     Counter getLoadCounter();

    /** The number of load failures. */
    Counter getNumLoadFailureCounter();
 
    /** The time spent for the latest load operation.* {@code k} to value {@code v} in the specified map. The behavior of this operation is
     * undefined if the specified map is modified while the operation is in progress.
     */
    void setLatestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge); putAll(Map<? extends RowData, ? extends Collection<RowData>> m);

      /** The number of records in cacheDiscards any cached value for the specified key. */
    void setNumCachedRecordsGaugeinvalidate(Gauge<Long>RowData numCachedRecordsGaugekey);

    /** TheDiscards numberall ofentries bytesin usedthe by cache. */
    void setNumCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
}

CacheConfig

A wrapper class containing parameters of the cache:

invalidateAll();

    /** Returns the number of key-value mappings in the cache. */
    long size();
}

LookupCacheFactory

As the cache should be instantiated during runtime execution to avoid serialization / deserialization, a factory is required for creating the cache. 

Code Block
languagejava
titleLookupCacheFactory

/** Factory for creating an instance of {@link LookupCache}. */
@PublicEvolving
public interface LookupCacheFactory extends Serializable {

    /**
     * Create a {@link LookupCache}.
     *
     * @param metricGroup - The lookup cache metric group in which the cache register predefined and
     *     custom metrics.
     */
    LookupCache createCache(LookupCacheMetricGroup metricGroup);
}

DefaultLookupCacheFactory

In order to simplify the usage of developer, we provide a default factory for building a default cache. 

Code Block
languagejava
titleDefaultLookupCacheFactory
/** Factory for creating instance of {@link DefaultLookupCache}. */
@PublicEvolving
public class DefaultLookupCacheFactory implements LookupCacheFactory {
    private final Duration expireAfterAccessDuration;
    private final Duration expireAfterWriteDuration;
    private final Integer initialCapacity;
    private final Long maximumSize;

    public static DefaultLookupCacheFactory.Builder newBuilder(
Code Block
languagejava
titleCacheConfig
public class CacheConfig implements ReadableConfig, WritableConfig {

    private final Configuration configuration = new Configuration();

    public CacheConfig setCacheStrategy(CacheConfigOptions.CacheStrategy strategy) {
        configuration.set(CacheConfigOptions.CACHE_STRATEGY, strategy);
        return this;
    }

    public CacheConfig setTTL(Duration ttl) {
        configuration.set(CacheConfigOptions.CACHE_TTL, ttl);
        return this;
    }

    public CacheConfig setMaxEntries(long maxEntries) {
        configuration.set(CacheConfigOptions.CACHE_MAX_ENTRIES, maxEntries);
        return this;
    }

    public CacheConfig setAllowEmptyValues(boolean allowEmptyValues) {
        configuration.set(CacheConfigOptions.CACHE_ALLOW_EMPTY_VALUES, allowEmptyValues);
        return this;
    }

    @Override
    public <T> T get(ConfigOption<T> option) {
        return new configuration.getBuilder(option);
    }

    private @OverrideDefaultLookupCacheFactory(
    public     <T> Optional<T> getOptional(ConfigOption<T> option)Duration {expireAfterAccessDuration,
        return configuration.getOptional(option);
   Duration }expireAfterWriteDuration,

     @Override
    public <T> WritableConfig set(ConfigOption<T>Integer optioninitialCapacity,
 T value) {
        return configuration.set(option, value);
 Long maximumSize) {
     }
}  

Cache Options

Code Block
languagejava
titleCacheConfigOptions
public class CacheConfigOptions// {Validation

    public static final ConfigOption<CacheStrategy> CACHE_STRATEGY this.expireAfterAccessDuration = expireAfterAccessDuration;
        this.expireAfterWriteDuration    ConfigOptions.key("lookup.cache.strategy")= expireAfterWriteDuration;
        this.initialCapacity = initialCapacity;
        this.maximumSize  .enumType(CacheStrategy.class)= maximumSize;
    }

    @Override
    public LookupCache createCache(LookupCacheMetricGroup metricGroup) {
    .noDefaultValue()
    // Create instance of DefaultLookupCache
    }

    /** Builder of {@link .withDescription(DefaultLookupCacheFactory}. */
    public static class Builder {
        private Duration expireAfterAccessDuration;
        private Duration "StrategyexpireAfterWriteDuration;
 of the cache to load and evict entries,private including ALL, LRU and NONE")Integer initialCapacity;

     public static final ConfigOption<Duration>private CACHE_TTL =Long maximumSize;

        public DefaultLookupCacheFactory.Builder expireAfterAccess(Duration  ConfigOptions.key("lookup.cache.ttl")duration) {
            expireAfterAccessDuration = duration;
      .durationType()
      return this;
        }

     .noDefaultValue()
   public DefaultLookupCacheFactory.Builder expireAfterWrite(Duration duration) {
             .withDescription("Time-to-live (TTL) of entries in cache");

expireAfterWriteDuration = duration;
        public static final ConfigOption<Long> CACHE_MAX_ENTRIES =return this;
        }

    ConfigOptions.key("lookup.cache.max-entries")
    public DefaultLookupCacheFactory.Builder initialCapacity(int initialCapacity) {
            .longType()this.initialCapacity = initialCapacity;
            return this;
        .noDefaultValue()}

        public DefaultLookupCacheFactory.Builder maximumSize(long maximumSize) {
        .withDescription("The  maximum number ofthis.maximumSize entries= thatmaximumSize;
 the cache could contain");

    public static final ConfigOption<Boolean> CACHE_ALLOW_EMPTY_VALUES = return this;
        }

     ConfigOptions.key("lookup.cache.allow-empty-values")
   public DefaultLookupCacheFactory build() {
            return new .booleanTypeDefaultLookupCacheFactory()
                    .noDefaultValue()expireAfterAccessDuration,
                    .withDescription(expireAfterWriteDuration,
                    initialCapacity,
         "Whether to allow empty values associated with a key in the cache"maximumSize);

     public enum CacheStrategy {}
        ALL,
        LRU,
        NONE
    }
}

CachingTableFunction

}
}

LookupCacheMetricGroup

An interface defining all cache related metric:A helper TableFunction with a default or a custom cache implementation.

Code Block
languagejava
titleCachingTableFunctionCacheMetricGroup
/**
 *Pre-defined Basemetrics class for lookup function that supports caching.
{@code LookupCache}. */
@PublicEvolving
public abstractinterface classLookupCacheMetricGroup CachingTableFunction extends TableFunction<RowData>MetricGroup {
    private/** finalThe Cache<RowData,number List<RowData>>of cache;

 hits. */
  /**
  void setHitCounter(Counter hitCounter);

 * Use default cache/** implementationThe withnumber specifiedof cache configurationmisses. */
     *void setMissCounter(Counter missCounter);

     /** @paramThe cacheConfignumber -of configurationtimes ofto theload defaultdata into cache.
 from external system.  */
    publicvoid CachingTableFunctionsetLoadCounter(CacheConfigCounter cacheConfigloadCounter);

 {
   /** The number of load cache = new DefaultCache<>(cacheConfig);
    }failures. */
    void setNumLoadFailuresCounter(Counter numLoadFailuresCounter);

    /**
 The time spent for *the Uselatest customload cache implementationoperation.
     */
    void * @param cache - custom cache implementation
     setLatestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);

    /** The number of records in cache. */
    publicvoid CachingTableFunctionsetNumCachedRecordsGauge(Cache<RowData, List<RowData>> cache) {Gauge<Long> numCachedRecordsGauge);

    /** The number of bytes this.cacheused =by cache;
    }

    @Override. */
    public void opensetNumCachedBytesGauge(FunctionContextGauge<Long> context) throws Exception {
        cache.open(new InternalCacheMetricGroup(context.getMetricGroup()));
    }

	public void eval(Object... keys){
		// Check the cache first, or load via lookup() if the cache doesn't contains the key
    }
   
    /**numCachedBytesGauge);
}

LookupFunctionProvider

This is the API between table framework and user's table source. Implementation should define how to create a lookup function and whether to use cache. 

Code Block
languagejava
titleLookupFunctionProvider
/**
 * Provider for creating {@link LookupFunction} and {@link LookupCacheFactory} if caching should be
 * enabled for the lookup table.
 */
@PublicEvolving
public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {

    /**
     * Creates {@link LookupFunctionProvider} with the given {@link LookupFunction} and disable
     * lookup table caching.
     */
 Lookup rows associated withstatic theLookupFunctionProvider key row from external system.
of(LookupFunction lookupFunction) {
       *
 return new LookupFunctionProvider() {
 * @param key the key row containing lookup fields
   @Override
  * @throws IOException if any exception was thrown while looking up the valuepublic LookupFunction createLookupFunction() {
     */
      public abstract List<RowData> lookup(RowData key) throwsreturn IOExceptionlookupFunction; 
}

AllCachingTableFunction

A helper TableFunction that load all entries into the cache and schedules reloading by the specified TTL. 

Code Block
languagejava
titleAllCachingTableFunction
/**
 * Base class for lookup function that loads all entries from external system and schedules
 * reloading by the specified TTL.
 */
public abstract class AllCachingTableFunction extends TableFunction<RowData> {
 	private Cache<RowData, List<RowData>> cache;

    // Interval to reload all entries in cache
    private final Duration reloadInterval;

	public AllCachingTableFunction(CacheConfig cacheConfig) {
		cache = new DefaultCache(cacheConfig);
		reloadInterval = cacheConfig.get(TTL);
	 }

            @Override
            public Optional<LookupCacheFactory> getCacheFactory() {
                return Optional.empty();
            }

    public AllCachingTableFunction(Cache<RowData, List<RowData>> cache, Duration reloadInterval) {
  @Override
      this.cache = cache;
    public Optional<Boolean> cacheMissingKey() {
 this.reloadInterval = reloadInterval;
    }

    @Override
    public voidreturn open(FunctionContext context) throws Exception {
Optional.empty();
            super.open(context);}
        scheduleCacheReload(this::reloadAllEntries, reloadInterval)};
    }

    public void eval(Object... keys) {
		// Only check the cache here
	}

    /**/**
     * Creates {@link LookupFunctionProvider} with the given {@link LookupFunction} and enable
     * Reloadcaching allwith entriesspecified in the cache{@link LookupCacheFactory}.
     */
    static LookupFunctionProvider *of(
 <p>This function will be invoked periodically in a scheduled executor, which invalidatesLookupFunction alllookupFunction,
     * entries in the cache and reloads allLookupCacheFactory entriescacheFactory,
 from external system.
     *
    boolean *cacheMissingKey) @param{
 cache - the cache in which all entriesreturn tonew beLookupFunctionProvider() reloaded{
     * @throws IOException - if any exception was@Override
 thrown while reloading the value
     */
  public LookupFunction public abstract void reloadAllEntries(Cache<RowData, List<RowData>> cache) throws IOException;
}

CachingAsyncTableFunction

Code Block
languagejava
titleCachingTableFunction
/**
 * Base class for asynchronous lookup function that supports caching.
 */
@PublicEvolving
public abstract class CachingAsyncTableFunction extends AsyncTableFunction<RowData> {createLookupFunction() {
                return lookupFunction;
            }

    private final Cache<RowData, List<RowData>> cache;

    /**@Override
     *   Use default cache implementation withpublic specifiedOptional<LookupCacheFactory> cache configuration.getCacheFactory() {
     *
     * @param cacheConfig - configuration of the defaultreturn cacheOptional.of(cacheFactory);
     */
     public CachingTableFunction(CacheConfig cacheConfig) {
 }

         cache = new DefaultCache<>(cacheConfig);@Override
    }

    /**
    public *Optional<Boolean> Use custom cache implementation.
cacheMissingKey() {
       *
     * @param cache - custom cache implementationreturn Optional.of(cacheMissingKey);
     */
    public CachingTableFunction(Cache<RowData, List<RowData>> cache) {}
        this.cache = cache;
};
    }

    @Override
    public void open(FunctionContext context) throws Exception { /** Creates an {@link LookupFunction} instance. */
    LookupFunction createLookupFunction();

    /**
    cache.open(new InternalCacheMetricGroup(context.getMetricGroup()));
    }

	public void eval(CompletableFuture<Collection<RowData>> result, Object... keys){
		// Check the cache first and complete the result directly if cache hit, 
		// or load asynchronously via lookupAsync() if the cache doesn't contains the key
    }
   
    /**
     * Lookup rows associated with the key row asynchronously from external system and return the
     * {@link CompletableFuture} of the lookup result. * Gets the {@link LookupCacheFactory} for creating lookup cache.
     *
     * <p>This factory will be used for creating an instance of cache during runtime execution for
     * optimizing the access to external lookup table.
     *
     * @return an {@link Optional} of {@link LookupCacheFactory}, or an empty {@link Optional} if
     *
     caching *shouldn't @parambe keyapplies -to the keylookup rowtable.
 whose associated value rows is to be returned  */
    Optional<LookupCacheFactory> getCacheFactory();

    /**
     */
  Whether  publicthe abstractmissing CompletableFuture<List<RowData>>key lookupAsync(RowData key);
}

Table Options

We propose to add new table options below for configuring lookup table and its underlying cache:

...

(key fields without any matching value rows) should be stored in the
     * cache.
     *
     * <p>Please note that this option is required if {@link #getCacheFactory()} returns a non-empty
     * instance. If the cache factory is empty, the return value of this function will be ignored.
     *
     * @return true if a null or empty value should be stored in the cache.
     */
    Optional<Boolean> cacheMissingKey();
}

AsyncLookupFunctionProvider

Code Block
languagejava
titleAsyncLookupFunctionProvider
@PublicEvolving
public interface AsyncLookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {

    /**
     * Creates {@link AsyncLookupFunctionProvider} with the given {@link AsyncLookupFunction} and
     * disable lookup table caching.
     */
    static AsyncLookupFunctionProvider of(AsyncLookupFunction asyncLookupFunction) {
        return new AsyncLookupFunctionProvider() {
            @Override
            public AsyncLookupFunction createAsyncLookupFunction() {
                return asyncLookupFunction;
            }

            @Override
            public Optional<LookupCacheFactory> getCacheFactory() {
                return Optional.empty();
            }

            @Override
            public Optional<Boolean> cacheMissingKey() {
                return Optional.empty();
            }
        };
    }

    /**
     * Creates {@link AsyncLookupFunctionProvider} with the given {@link AsyncLookupFunction} and
     * enable caching with specified {@link LookupCacheFactory}.
     */
    static AsyncLookupFunctionProvider of(
            AsyncLookupFunction asyncLookupFunction,
            LookupCacheFactory cacheBuilder,
            boolean cacheMissingKey) {
        return new AsyncLookupFunctionProvider() {
            @Override
            public AsyncLookupFunction createAsyncLookupFunction() {
                return asyncLookupFunction;
            }

            @Override
            public Optional<LookupCacheFactory> getCacheFactory() {
                return Optional.of(cacheBuilder);
            }

            @Override
            public Optional<Boolean> cacheMissingKey() {
                return Optional.of(cacheMissingKey);
            }
        };
    }

    /** Creates an {@link AsyncLookupFunction} instance. */
    AsyncLookupFunction createAsyncLookupFunction();

    /**
     * Gets the {@link LookupCacheFactory} for creating lookup cache.
     *
     * <p>This factory will be used for creating an instance of cache during runtime execution for
     * optimizing the access to external lookup table.
     *
     * @return an {@link Optional} of {@link LookupCacheFactory}, or an empty {@link Optional} if
     *     caching shouldn't be applies to the lookup table.
     */
    Optional<LookupCacheFactory> getCacheFactory();

    /**
     * Whether the missing key (key fields without any matching value rows) should be stored in the
     * cache.
     *
     * <p>Please note that this option is required if {@link #getCacheFactory()} returns a non-empty
     * instance. If the cache factory is empty, the return value of this function will be ignored.
     *
     * @return true if a null or empty value should be stored in the cache.
     */
    Optional<Boolean> cacheMissingKey();
}

Cache Metrics

It is important to mention that a cache implementation does not have to report all the defined metrics. But if a cache reports a metric of the same semantic defined below, the implementation should follow the convention.

NameTypeUnitDescription
numCachedRecordGaugeRecordsThe number of records in cache.
numCachedBytesGaugeBytesThe number of bytes used by cache.
hitCountCounter
The number of cache hits
missCountCounter
The number of cache misses, which might leads to loading operations
loadCountCounter

The number of times to load data into cache from external system.

For LRU cache the load count should be equal to miss count, but for all cache this would be different.

numLoadFailureCounter
The number of load failures
latestLoadTimeGaugemsThe time spent for the latest load operation

Here we just define fundamental metrics and let the external metric system make the aggregation to get more descriptive values such as hitRate = hitCount / (hitCount + missCount).

Scope

The metric group for the cache would be a sub-group of the OperatorMetricGroup where the table function belongs to

...

.

Compatibility, Deprecation, and Migration Plan

...