Status
...
Page properties | |
---|---|
|
...
JIRA: TBD
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
This FLIP is a joint work of initially proposed by Yuan Zhu (zstraw@163.com), and finished by Qingsheng Ren (partial caching part) and Alexander Smirnov (full caching part).
Table of Contents |
---|
Motivation
As a widely-used feature in Flink SQL jobs, the performance of lookup table source is essential not only for users but also source developers for tuning their implementations. Most lookup table sources use cache to achieve better performance, but there are some features missing in the current design of cache:
...
In order to address the issues above, we propose here to define a unified abstraction for lookup source cache and its related metrics.
Proposed Changes
We'd like to split the proposal into two kinds of caching strategies: LRU cache and all cache.
LRU cache
Top-level APIs
In order to clarify the semantic of lookup, we'd like to introduce some top-level APIs for general lookup operations without cachingLRU is the most common caching strategy, which dynamically evicts entries in the cache according to the given configuration. For supporting LRU cache in lookup table, we propose several new interfaces to simplify the work for developers to implement lookup table functions and enable cache:
LookupFunction
/AsyncLookupFunction
, an extended version of TableFunction to
...
- make the API more straight forward.
LookupFunctionProvider
/AsyncLookupProvider
, serve as the creator of LookupFunction / AsyncLookupFunction in table source
And APIs related to the cache:
LookupCache
...
- , defining the cache
...
- used in lookup table.
...
DefaultLookupCache
a default implementation of a
...
- cache that suitable for most use cases.
...
CacheMetricGroup
, defining metrics should be reported by the
...
- cache.
...
Partial and Full Caching
More specifically, we'd like to provide public interfaces for the most 2 common cases to lookup source developers, which are named as partial and full caching.
Partial caching
Partial caching loads data into the cache along with the access to the external system. If the key to lookup does not exist in the cache, a lookup action to the external system will be triggered and the lookup result will be stored in the cache for further loading. Users and lookup table developers are able to config the eviction policy and maximum size of the cache.
In order to support partial caching, we propose to introduce 2 new interfaces:
PartialCachingLookupProvider
/AsyncPartialCachingLookupProvider
, as the API interacting with table source to get LookupFunction and LookupCache.
The cache serves as a component in LookupJoinRunner, and would be pluggable by specifying in the constructor of the provider. The planner will take over the lookup function and the cache created from the provider and pass it to the LookupJoinRunner. The cache will be instantiated during the runtime execution and loading operations via lookup function if there's a cache miss.
Full Caching
If the size of lookup table is relatively small to fit into the memory, and the lookup table doesn't change frequently, it'll be more efficient to load all entries of the lookup table into the cache to reduce network I/O, and refresh the table periodically. We'd like to name this use case as "full cache". Logically the reload operation is a kind of scan, so we'd like to reuse the ScanRuntimeProvider so that developers could reuse the scanning logic implemented in Source / SourceFunction / InputFormat. Considering the
The LRU cache serves as a component in LookupJoinRunner, and would be pluggable by specifying LookupCacheFactory in LookupFunctionProvider. The developer of a lookup table needs to define a LookupFunctionProvider / AsyncLookupProvider in their implementation of LookupTableSource to specify the LookupFunction and the factory of the cache, then the planner will take over the cache factory, pass it to the LookupJoinRunner, and the cache will be instantiated during the runtime execution.
All Cache
If the size of lookup table is relatively small to fit into the memory, and the lookup table doesn't change frequently, it'll be more efficient to load all entries of the lookup table into the cache to reduce network I/O, and refresh the table periodically. We'd like to name this use case as "all cache". Logically the reload operation is a kind of scan, so we'd like to reuse the ScanRuntimeProvider so that developers could reuse the scanning logic implemented in Source / SourceFunction / InputFormat. Considering the complexity of Source API, we'd like to support SourceFunction and InputFormat API first. Supporting Source API might require new topology and will be discussed later in another FLIP
Public Interfaces
Lookup Functions
.
We propose to introduce several new interfaces:
FullCachingLookupProvider
, for reusing the ability of scanning.CacheReloadTrigger
, for customizing reloading strategies of all entries in the full cache.
Also we'd like to provide two default implementations of CacheReloadTrigger:
PeriodicCacheReloadTrigger
, for triggering reload periodically with a specific interval- TimedCacheReloadTrigger, for triggering reload at the specific time and repeat with the interval in days.
Public Interfaces
Lookup Functions
As the usage of As the usage of TableFunction interface is not quite straight forward to lookup table developers, we'd like to introduce a new interface for sync and async lookup tables. Caching will be only supported on LookupFunction / AsyncLookupFunction.
Code Block | ||||
---|---|---|---|---|
| ||||
/** * A wrapper class of {@link TableFunction} for synchronously lookup rows matching the lookup keys * from external system. * * <p>The output type of this table function is fixed as {@link RowData}. */ @PublicEvolving public abstract class LookupFunction extends TableFunction<RowData> { /** * Synchronously lookup rows matching the lookup keys. * * @param keyskeyRow - KeysA to lookup. {@link RowData} that wraps keys to lookup. * @return A collections of all matching rows in the lookup table. */ public abstract Collection<RowData> lookup(Object... keysRowData keyRow) throws IOException; /** Invoke {@link #lookup} and handle exceptions. */ public final void eval(Object... keys) { try { lookup(GenericRowData.of(keys)).forEach(this::collect); } catch (IOException e) { throw new RuntimeException("Failed to lookup values with given key", e); } } } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * A wrapper class of {@link AsyncTableFunction} for asynchronously lookup rows matching the lookup * keys from external system. * * <p>The output type of this table function is fixed as {@link RowData}. */ @PublicEvolving public abstract class AsyncLookupFunction extends AsyncTableFunction<RowData> { /** * Asynchronously lookup rows matching the lookup keys. * * @param keyskeyRow - KeysA {@link RowData} that wraps keys to lookup. * @return A collections of all matching rows in the lookup table. */ public abstract CompletableFuture<Collection<RowData>> asyncLookup(Object... keysRowData keyRow); /** Invokes {@link #asyncLookup} and chains futures. */ public final void eval(CompletableFuture<Collection<RowData>> future, Object... keys) { asyncLookupasyncLookup(GenericRowData.of(keys)) .whenCompleteAsync( (result, exception) -> { if (exception != null) { future.completeExceptionally(exception); return; } future.complete(result); }); } } |
LookupCache
Considering there might be custom caching strategies and optimizations, we'd like to expose the cache interface as public API for developers to make the cache pluggable.
LookupFunctionProvider
Code Block | ||||
---|---|---|---|---|
| ||||
/** A provider for creating {@link LookupFunction}. */
@PublicEvolving
public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {
static LookupFunctionProvider of(LookupFunction lookupFunction) {
return () -> lookupFunction;
}
LookupFunction createLookupFunction();
} |
AsyncLookupFunctionProvider
Code Block | ||||
---|---|---|---|---|
| ||||
/** A provider for creating {@link AsyncLookupFunction}. */
@PublicEvolving
public interface AsyncLookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {
static AsyncLookupFunctionProvider of(AsyncLookupFunction asyncLookupFunction) {
return () -> asyncLookupFunction;
}
AsyncLookupFunction createAsyncLookupFunction();
} |
LookupCache
Considering there might be custom caching strategies and optimizations, we'd like to expose the cache interface as public API for developers to make the cache pluggable.
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* A semi-persistent mapping from keys to values for storing entries of lookup table.
*
* <p>The type of the caching key is a {@link RowData} with lookup key fields packed inside. The
* type of value is a {@link Collection} of {@link RowData}, which are rows matching lookup key
* fields.
*
* <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either
* evicted or manually invalidated.
*
* <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
* by multiple concurrent threads.
*/
@PublicEvolving
public interface LookupCache extends AutoClosable, Serializable {
/** | ||||
Code Block | ||||
| ||||
/** * A semi-persistent mapping from keys to values for storing entries of lookup table. * * <p>The type of the caching key is a {@link RowData} with lookup key fields packed inside. The * type of value is a {@link Collection} of {@link RowData}, which are rows matching lookup key * fields. * * <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either * evicted or manually invalidated. * * <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed * by multiple concurrent threads. */ @PublicEvolving public interface LookupCache { /** * Returns the value associated with key in this cache, or null if there is no cached value for * key. */ @Nullable Collection<RowData> getIfPresent(RowData key); /** * Associates the specified value rows with the specified key row in the cache. If the cache * previously contained value associated with the key, the old value is replaced by the * specifiedInitialize the valuecache. * * @return@param metricGroup the previousmetric group valueto rowsregister associatedcache with key, or null if there was no mapping for key. * @param key - key row with which the specified value is to be associated * @param value – value rows to be associated with the specified keyrelated metrics. */ void open(CacheMetricGroup metricGroup); /** * Returns the value associated with key in this cache, or null if there is no cached value for * key. */ @Nullable Collection<RowData> putgetIfPresent(RowData key, Collection<RowData> value); /** * CopiesAssociates all the mappings fromspecified value rows with the specified mapkey row toin the cache. TheIf effect of this call isthe cache * equivalentpreviously tocontained thatvalue ofassociated callingwith {@code put(k, v)} on this map once for each mapping from key * {@code k} to value {@code v} in the specified map. The behavior of this operation isthe key, the old value is replaced by the * specified value. * * @return the previous value rows associated with key, or null if there was no mapping for key. * undefined@param ifkey the- specifiedkey maprow iswith modifiedwhich whilethe thespecified operationvalue is in progress.to be associated */ @param value – void putAll(Map<? extends RowData, ? extends Collection<RowData>> m); /** Discards any cached value for the specified key. value rows to be associated with the specified key */ voidCollection<RowData> invalidateput(RowData key, Collection<RowData> value); /** Discards allany cached entriesvalue infor the cachespecified key. */ void invalidateAllinvalidate(RowData key); /** Returns the number of key-value mappings in the cache. */ long size(); } |
...
DefaultLookupCache
As the cache should be instantiated during runtime execution to avoid serialization / deserialization, a factory is required for creating the cache.
Code Block | ||||
---|---|---|---|---|
| ||||
/** Factory for creating an instanceDefault implementation of {@link LookupCache}. */ @PublicEvolving public interfaceclass LookupCacheFactoryDefaultLookupCache extendsimplements SerializableLookupCache { private final Duration /**expireAfterAccessDuration; private final Duration expireAfterWriteDuration; * Createprivate afinal {@link LookupCache}.Long maximumSize; private final boolean *cacheMissingKey; * @paramprivate metricGroupDefaultLookupCache( - The lookup cache metric group in which the cache register predefinedDuration andexpireAfterAccessDuration, * customDuration metrics.expireAfterWriteDuration, */ LookupCache createCache(LookupCacheMetricGroup metricGroup); } |
DefaultLookupCacheFactory
In order to simplify the usage of developer, we provide a default factory for building a default cache.
Code Block | ||||
---|---|---|---|---|
| ||||
/** Factory for creating instance of {@link DefaultLookupCache}. */ @PublicEvolving public class DefaultLookupCacheFactory implements LookupCacheFactory { private final Duration expireAfterAccessDuration; private final Duration expireAfterWriteDuration; private final Integer initialCapacity; private final Long maximumSize; public static DefaultLookupCacheFactory. Long maximumSize, boolean cacheMissingKey) { this.expireAfterAccessDuration = expireAfterAccessDuration; this.expireAfterWriteDuration = expireAfterWriteDuration; this.initialCapacity = initialCapacity; this.maximumSize = maximumSize; this.cacheMissingKey = cacheMissingKey; } public static Builder newBuilder() { return new Builder(); } public private DefaultLookupCacheFactory( static class Builder { private Duration expireAfterAccessDuration,; private Duration expireAfterWriteDuration,; private Long maximumSize; Integer initialCapacity, private Boolean cacheMissingKey; Long maximumSize) { public Builder expireAfterAccess(Duration duration) { // Validation this.expireAfterAccessDuration = expireAfterAccessDurationduration; this.expireAfterWriteDuration = expireAfterWriteDuration return this; this.initialCapacity = initialCapacity; } public this.maximumSize = maximumSize;Builder expireAfterWrite(Duration duration) { } @Override publicexpireAfterWriteDuration LookupCache createCache(LookupCacheMetricGroup metricGroup) { = duration; // Create instance of DefaultLookupCachereturn this; } /** Builder of {@link DefaultLookupCacheFactory}. */ } public staticBuilder class BuildermaximumSize(long maximumSize) { private Duration expireAfterAccessDuration; this.maximumSize = maximumSize; private Duration expireAfterWriteDuration; privatereturn Integer initialCapacitythis; private Long maximumSize;} public DefaultLookupCacheFactory.Builder expireAfterAccesscacheMissingKey(Durationboolean durationcacheMissingKey) { expireAfterAccessDurationthis.cacheMissingKey = durationcacheMissingKey; return this; } public DefaultLookupCacheFactory.BuilderDefaultLookupCache expireAfterWritebuild(Duration duration) { expireAfterWriteDurationreturn =new duration;DefaultLookupCache( return this; }expireAfterAccessDuration, public DefaultLookupCacheFactory.Builder initialCapacity(int initialCapacity) { this.initialCapacity = initialCapacity;expireAfterWriteDuration, return this; } maximumSize, cacheMissingKey); public DefaultLookupCacheFactory.Builder maximumSize(long maximumSize) {} } } |
CacheMetricGroup
An interface defining all cache related metric:
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Pre-defined metrics for thiscache.maximumSize = maximumSize; * * <p>Please note that these methods should only be invoked once. returnRegistering this; a metric with same * name for } public DefaultLookupCacheFactory build()multiple times would lead to an undefined behavior. */ @PublicEvolving public interface CacheMetricGroup extends MetricGroup { /** The number of cache hits. */ return newvoid DefaultLookupCacheFactoryhitCounter(Counter hitCounter); /** The number of cache misses. */ void missCounter(Counter missCounter); expireAfterAccessDuration, /** The number of times to load data into cache from external system. */ void expireAfterWriteDuration,loadCounter(Counter loadCounter); /** The number of load failures. */ void numLoadFailuresCounter(Counter numLoadFailuresCounter); initialCapacity, /** The time spent for the latest load operation. */ void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge); maximumSize); } } } |
LookupCacheMetricGroup
An interface defining all cache related metric:
Code Block | ||||
---|---|---|---|---|
| ||||
/** The Pre-definednumber metricsof forrecords {@codein LookupCache}cache. */ @PublicEvolving public interface LookupCacheMetricGroup extendsvoid MetricGroup {numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge); /** The number of cachebytes hits. used by cache. */ void setHitCounternumCachedBytesGauge(CounterGauge<Long> hitCounternumCachedBytesGauge); } |
PartialCachingLookupProvider
This is the API between table framework and user's table source. Implementation should define how to create a lookup function and whether to use cache.
Code Block | ||||
---|---|---|---|---|
| ||||
/** The* numberProvider offor cachecreating misses. */ void setMissCounter(Counter missCounter);{@link LookupFunction} and {@link LookupCache} for storing lookup entries. */ @PublicEvolving public interface PartialCachingLookupProvider extends LookupFunctionProvider { /** The number of times to* loadBuild dataa into{@link cachePartialCachingLookupProvider} from external system. */the specified {@link LookupFunction} and void setLoadCounter(Counter loadCounter); * {@link LookupCache}. */** The number static PartialCachingLookupProvider of load failures. */ (LookupFunction lookupFunction, LookupCache cache) { return voidnew setNumLoadFailuresCounterPartialCachingLookupProvider(Counter) numLoadFailuresCounter);{ /** The time spent for the latest load operation. */ @Override void setLatestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge); /** Thepublic numberLookupCache of records in cache. */ getCache() { void setNumCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge); return cache; } /** The number of bytes used by cache. */@Override void setNumCachedBytesGauge(Gauge<Long> numCachedBytesGauge); } |
LookupFunctionProvider
This is the API between table framework and user's table source. Implementation should define how to create a lookup function and whether to use cache.
public LookupFunction createLookupFunction() {
return lookupFunction;
}
};
}
/** Get a new instance of {@link LookupCache}. */
LookupCache getCache();
} |
PartialCachingAsyncLookupProvider
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Provider for creating {@link AsyncLookupFunction} and {@link LookupCache} for storing lookup entries.
*/
@PublicEvolving
public interface PartialCachingAsyncLookupProvider extends AsyncLookupFunctionProvider {
/**
* Build a {@link PartialCachingLookupProvider} from the specified {@link AsyncLookupFunction} and
* {@link LookupCache}.
*/
static PartialCachingLookupProvider of(AsyncLookupFunction asyncLookupFunction, LookupCache cache) {
| ||||
Code Block | ||||
| ||||
/** * Provider for creating {@link LookupFunction} and {@link LookupCacheFactory} if caching should be * enabled for the lookup table. */ @PublicEvolving public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider { /** * Creates {@link LookupFunctionProvider} with the given {@link LookupFunction} and disable * lookup table caching. */ static LookupFunctionProvider of(LookupFunction lookupFunction) { return new LookupFunctionProvider() { @Override return publicnew LookupFunction createLookupFunctionPartialCachingAsyncLookupProvider() { return lookupFunction; } @Override @Override public Optional<LookupCacheFactory>LookupCache getCacheFactorygetCache() { return Optional.empty()cache; } @Override public Optional<Boolean>AsyncLookupFunction cacheMissingKeycreateAsyncLookupFunction() { return Optional.empty()asyncLookupFunction; } }; } /** Get a new instance *of Creates {@link LookupFunctionProviderLookupCache}. with*/ the given {@link LookupFunction} and enable * caching with specified {@link LookupCacheFactory}. */ static LookupFunctionProvider of( LookupFunction lookupFunction, LookupCacheFactory cacheFactory, boolean cacheMissingKey)LookupCache getCache(); } |
FullCachingLookupProvider
This interface is for supporting full cache strategy. It reuses ScanRuntimeProvider and defines reload time.
Code Block | ||||
---|---|---|---|---|
| ||||
/** * A {@link CachingLookupProvider} that never lookup in external system on cache miss and provides a * cache for holding all entries in the external system. The cache will be fully reloaded from the * external system by the {@link ScanTableSource.ScanRuntimeProvider} and reload operations will be * triggered by the {@link CacheReloadTrigger}. */ @PublicEvolving public interface FullCachingLookupProvider extends LookupFunctionProvider { static return new LookupFunctionProvider() { FullCachingLookupProvider of( ScanTableSource.ScanRuntimeProvider @OverridescanRuntimeProvider, public LookupFunction createLookupFunction(CacheReloadTrigger cacheReloadTrigger) { return lookupFunction; } new FullCachingLookupProvider() { @Override public Optional<LookupCacheFactory>ScanTableSource.ScanRuntimeProvider getCacheFactorygetScanRuntimeProvider() { return Optional.of(cacheFactory)scanRuntimeProvider; } @Override public Optional<Boolean>CacheReloadTrigger cacheMissingKeygetCacheReloadTrigger() { return Optional.of(cacheMissingKey)cacheReloadTrigger; } }; }@Override /** Creates an {@link LookupFunction} instance. */ public LookupFunction createLookupFunction(); { /** * Gets the {@link LookupCacheFactory} for creating lookupreturn cache. keyRow *-> null; * <p>This factory will be used for creating} an instance of cache during runtime execution for}; * optimizing the access to external lookup table.} /** * @returnGet ana {@link OptionalScanTableSource.ScanRuntimeProvider} offor {@link LookupCacheFactory}, or an empty {@link Optional} ifscanning all entries from the external * lookup table and load caching shouldn't be applies to the lookup tableinto the cache. */ Optional<LookupCacheFactory>ScanTableSource.ScanRuntimeProvider getCacheFactorygetScanRuntimeProvider(); /** Get a {@link CacheReloadTrigger} *for Whethertriggering the missingreload key (key fields without any matching value rows) should be stored in the * cache. *operation. */ CacheReloadTrigger getCacheReloadTrigger(); } |
CacheReloadTrigger
A trigger defining custom logic for triggering full cache reloading.
Code Block | ||||
---|---|---|---|---|
| ||||
/** Customized trigger for reloading all lookup table entries in full caching mode. */ @PublicEvolving public interface CachingReloadTrigger extends AutoCloseable, Serializable { /** <p>PleaseOpen notethe that this option is required if {@link #getCacheFactory()} returns a non-emptytrigger. */ void open(Context context) throws Exception; /** * instance. If the cache factory is empty, the return value of this function will be ignoredContext of {@link CacheReloadTrigger} for getting information about times and * triggering reload. */ interface *Context @return{ true if a null or empty value should/** beGet storedcurrent inprocessing the cache.time. */ */ long Optional<Boolean> cacheMissingKeycurrentProcessingTime(); } |
AsyncLookupFunctionProvider
Code Block | ||||
---|---|---|---|---|
| ||||
@PublicEvolving public interface AsyncLookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider { /** Get current watermark on the main stream. */ Creates {@link AsyncLookupFunctionProvider} with the given {@link AsyncLookupFunction} andlong currentWatermark(); * disable lookup table caching. /** Trigger a reload operation on the full cache. */ static AsyncLookupFunctionProviderCompletableFuture<Void> oftriggerReload(AsyncLookupFunction asyncLookupFunction) {; } } |
PeriodicCacheReloadTrigger
An implementation of FullCachingReloadTrigger that triggers reload with a specified interval.
Code Block | ||||
---|---|---|---|---|
| ||||
/** A trigger that returnreloads newall AsyncLookupFunctionProvider() { @Overrideentries periodically with specified interval or delay. */ public class PeriodicCacheReloadTrigger implements CacheReloadTrigger { private final Duration reloadInterval; private publicfinal AsyncLookupFunction createAsyncLookupFunction() {ScheduleMode scheduleMode; public PeriodicCacheReloadTrigger(Duration reloadInterval, ScheduleMode scheduleMode) { return asyncLookupFunction this.reloadInterval = reloadInterval; this.scheduleMode = scheduleMode; } @Override public void open(CacheReloadTrigger.Context context) @Override{ // Register periodic reload task public Optional<LookupCacheFactory> getCacheFactory() {} @Override public void close() throws Exception { return Optional.empty(); // Dispose resources } public enum ScheduleMode { @Override FIXED_DELAY, FIXED_RATE public Optional<Boolean> cacheMissingKey() { return Optional.empty(); }} } |
TimedCacheReloadTrigger
Code Block | ||||
---|---|---|---|---|
| ||||
/** A trigger that reloads at a specific local time and repeat for the given interval in days. */ public class TimedCacheReloadTrigger implements CacheReloadTrigger { private final LocalTime reloadTime; private final int }reloadIntervalInDays; } /**public TimedCacheReloadTrigger(LocalTime reloadTime, int reloadIntervalInDays) { * Creates {@link AsyncLookupFunctionProvider} with the given {@link AsyncLookupFunction} and this.reloadTime = reloadTime; this.reloadIntervalInDays *= enablereloadIntervalInDays; caching with specified {@link LookupCacheFactory}. */@Override staticpublic AsyncLookupFunctionProvidervoid ofopen( Context context) { // Register periodic reload task AsyncLookupFunction asyncLookupFunction,} LookupCacheFactory cacheBuilder,@Override public void close() throws boolean cacheMissingKey) Exception { // Dispose resources return new AsyncLookupFunctionProvider() { @Override public AsyncLookupFunction createAsyncLookupFunction() { return asyncLookupFunction; } @Override public Optional<LookupCacheFactory> getCacheFactory() { return Optional.of(cacheBuilder); } @Override public Optional<Boolean> cacheMissingKey() { return Optional.of(cacheMissingKey); } }; } /** Creates an {@link AsyncLookupFunction} instance. */ AsyncLookupFunction createAsyncLookupFunction(); /** * Gets the {@link LookupCacheFactory} for creating lookup cache. * * <p>This factory will be used for creating an instance of cache during runtime execution for * optimizing the access to external lookup table. * * @return an {@link Optional} of {@link LookupCacheFactory}, or an empty {@link Optional} if * caching shouldn't be applies to the lookup table. */ Optional<LookupCacheFactory> getCacheFactory(); /** * Whether the missing key (key fields without any matching value rows) should be stored in the * cache. * * <p>Please note that this option is required if {@link #getCacheFactory()} returns a non-empty * instance. If the cache factory is empty, the return value of this function will be ignored. * * @return true if a null or empty value should be stored in the cache. */ Optional<Boolean> cacheMissingKey(); }} } |
TableFunctionProvider / AsyncTableFunctionProvider
We'd like to deprecate these two interfaces and let developers to switch to the new LookupFunctionProvider / AsyncLookupFunctionProvider / FullCachingLookupProvider instead.
Table Options for Lookup Cache
In order to unify the usage of caching across all connectors, we'd like to introduce some common table options, which are defined under class LookupOptions
. Note that these options are not required to implement by all connectors.
Option | Type | Descriptions |
---|---|---|
lookup.cache | Enum of NONE, PARTIAL and FULL | The caching strategy for this lookup table. NONE: Do not use cache Partial: Use partial caching mode FULL: Use full caching mode |
lookup.max-retries | Integer | The maximum allowed retries if a lookup operation fails |
lookup.partial-cache.expire-after-access | Duration | Duration to expire an entry in the cache after accessing |
lookup.partial-cache.expire-after-write | Duration | Duration to expire an entry in the cache after writing |
lookup.partial-cache.cache-missing-key | Boolean | Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table |
lookup.partial-cache.max-rows | Long | The maximum number of rows to store in the cache |
lookup.full-cache.reload-strategy | Enum of PERIODIC and TIMED | The reload strategy for the full cache scenario. PERIODIC: Use PeriodicCacheReloadTrigger TIMED: Use TimedCacheReloadTrigger |
lookup.full-cache.periodic-reload.interval | Duration | Duration to trigger reload in the PeriodicCacheReloadTrigger |
lookup.full-cache.periodic-reload.schedule-mode | Enum of FIXED_DELAY and FIXED_RATE | The periodically schedule mode of reloading in the PeriodicCacheReloadTrigger |
lookup.full-cache.timed-reload.iso-time | String | Time in ISO-8601 format when cache needs to be reloaded. Time can be specified either with timezone or without timezone (target JVM local timezone will be used). See formatter ISO_TIME. |
lookup.full-cache.timed-reload.interval-in-days | Integer | The interval in days to trigger the reload at the specified time |
Cache Metrics
It is important to mention that a cache implementation does not have to report all the defined metrics. But if a cache reports a metric of the same semantic defined below, the implementation should follow the convention.
Name | Type | Unit | Description |
---|---|---|---|
numCachedRecord | Gauge | Records | The number of records in cache. |
numCachedBytes | Gauge | Bytes | The number of bytes used by cache. |
hitCount | Counter | The number of cache hits | |
missCount | Counter | The number of cache misses, which might leads to loading operations | |
loadCount | Counter | The number of times to load data into cache from external system. For |
partial cache the load count should be equal to miss count, but for all cache this would be different. | |||
numLoadFailure | Counter | The number of load failures | |
latestLoadTime | Gauge | ms | The time spent for the latest load operation |
Here we just define fundamental metrics and let the external metric system make the aggregation to get more descriptive values such as hitRate = hitCount / (hitCount + missCount).
Scope
The metric group for the cache would be a sub-group of the OperatorMetricGroup where the table function belongs to.group of the OperatorMetricGroup where the table function belongs to.
Future Works
In order to reduce network I/O with external systems and the usage of cache further, some optimizations implemented on scan source could be also applied on the lookup table, such as projection and filter pushdown. These features will be introduced separately in another FLIP.
Compatibility, Deprecation, and Migration Plan
Currently we have JDBC, Hive and HBase connector implemented lookup table source. All existing implementations will be migrated to the current design and the migration will be transparent to end users. Table options related to caching defined by these connectors will be migrated to new table options defined in this FLIP above.
Test Plan
We will use unit and integration test for validating the functionality of cache implementations.
Rejected Alternatives
Add cache in TableFunction implementations
Compared with this design, adding cache in TableFunction implementations might lead to inconsistency between sync and async table function, and not suitable for applying optimizations.