Status
...
Page properties | |
---|---|
|
...
JIRA: TBD
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
This FLIP is a joint work of initially proposed by Yuan Zhu (zstraw@163.com), and finished by Qingsheng Ren (partial caching part) and Alexander Smirnov (full caching part).
Table of Contents |
---|
Motivation
...
In order to clarify the semantic of lookup, we'd like to introduce some top-level APIs for general lookup operations without caching:
LookupFunction
/AsyncLookupFunction
, an extended version of TableFunction to make the API more straight forward.LookupFunctionProvider
/AsyncLookupProvider
, serve as the creator of LookupFunction / AsyncLookupFunction in table source
...
More specifically, we'd like to provide public interfaces for the most 2 common cases to lookup source developers, which we call are named as partial and full caching.
...
FullCachingLookupProvider
, for reusing the ability of scanning.FullCachingReloadTrigger
CacheReloadTrigger
, for customizing reloading strategies of all entries in the full cache.
Also we'd like to provide two default implementations of CacheReloadTrigger:
PeriodicCacheReloadTrigger
, for triggering reload periodically with a specific interval- TimedCacheReloadTrigger, for triggering reload at the specific time and repeat with the interval in days.
Public Interfaces
Lookup Functions
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** A provider for creating {@link LookupFunction}. */ @PublicEvolving public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider { static LookupFunctionProvider of(LookupFunction lookupFunction) { return () -> lookupFunction; } LookupFunction createLookupFunction(); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** A provider for creating {@link AsyncLookupFunction}. */ @PublicEvolving public interface AsyncLookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider { static AsyncLookupFunctionProvider of(AsyncLookupFunction asyncLookupFunction) { return () -> asyncLookupFunction; } AsyncLookupFunction createAsyncLookupFunction(); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * A semi-persistent mapping from keys to values for storing entries of lookup table. * * <p>The type of the caching key is a {@link RowData} with lookup key fields packed inside. The * type of value is a {@link Collection} of {@link RowData}, which are rows matching lookup key * fields. * * <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either * evicted or manually invalidated. * * <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed * by multiple concurrent threads. */ @PublicEvolving public interface LookupCache extends AutoClosable, Serializable { /** * Initialize the cache. * * @param metricGroup the metric group to register cache related metrics. */ void open(CacheMetricGroup metricGroup); /** * Returns the value associated with key in this cache, or null if there is no cached value for * key. */ @Nullable Collection<RowData> getIfPresent(RowData key); /** * Associates the specified value rows with the specified key row in the cache. If the cache * previously contained value associated with the key, the old value is replaced by the * specified value. * * @return the previous value rows associated with key, or null if there was no mapping for key. * @param key - key row with which the specified value is to be associated * @param value – value rows to be associated with the specified key */ Collection<RowData> put(RowData key, Collection<RowData> value); /** Discards any cached value for the specified key. */ void invalidate(RowData key); /** Returns the number of key-value mappings in the cache. */ long size(); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** Default implementation of {@link LookupCache}. */ @PublicEvolving public class DefaultLookupCache implements LookupCache { private final Duration expireAfterAccessDuration; private final Duration expireAfterWriteDuration; private final Long maximumSize; private final boolean cacheMissingKey; private DefaultLookupCache( Duration expireAfterAccessDuration, Duration expireAfterWriteDuration, Long maximumSize, boolean cacheMissingKey) { this.expireAfterAccessDuration = expireAfterAccessDuration; this.expireAfterWriteDuration = expireAfterWriteDuration; this.initialCapacity = initialCapacity; this.maximumSize = maximumSize; this.cacheMissingKey = cacheMissingKey; } public static Builder newBuilder() { return new Builder(); } public static class Builder { private Duration expireAfterAccessDuration; private Duration expireAfterWriteDuration; private Long maximumSize; private Boolean cacheMissingKey; public Builder expireAfterAccess(Duration duration) { expireAfterAccessDuration = duration; return this; } public Builder expireAfterWrite(Duration duration) { expireAfterWriteDuration = duration; return this; } public Builder maximumSize(long maximumSize) { this.maximumSize = maximumSize; return this; } public Builder cacheMissingKey(boolean cacheMissingKey) { this.cacheMissingKey = cacheMissingKey; return this; } public DefaultLookupCache build() { return new DefaultLookupCache( expireAfterAccessDuration, expireAfterWriteDuration, maximumSize, cacheMissingKey); } } } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Pre-defined metrics for cache. * * <p>Please note that these methods should only be invoked once. Registering a metric with same * name for multiple times would lead to an undefined behavior. */ @PublicEvolving public interface CacheMetricGroup extends MetricGroup { /** The number of cache hits. */ void hitCounter(Counter hitCounter); /** The number of cache misses. */ void missCounter(Counter missCounter); /** The number of times to load data into cache from external system. */ void loadCounter(Counter loadCounter); /** The number of load failures. */ void numLoadFailuresCounter(Counter numLoadFailuresCounter); /** The time spent for the latest load operation. */ void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge); /** The number of records in cache. */ void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge); /** The number of bytes used by cache. */ void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Provider for creating {@link LookupFunction} and {@link LookupCache} for storing lookup entries. */ @PublicEvolving public interface PartialCachingLookupProvider extends LookupFunctionProvider { private/** final LookupFunction lookupFunction; * Build privatea final{@link LookupCache cache; PartialCachingLookupProvider} from the specified {@link LookupFunction} and * {@link LookupCache}. */ publicstatic PartialCachingLookupProvider of(LookupFunction lookupFunction, LookupCache cache) { return this.lookupFunction = lookupFunction;new PartialCachingLookupProvider() { this.cache = cache;@Override } @Override public LookupFunctionLookupCache createLookupFunctiongetCache() { return lookupFunctioncache; } } |
AsyncPartialCachingLookupProvider
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Provider for creating {@link AsyncLookupFunction} and {@link LookupCache} for storing lookup entries. */ @PublicEvolving public interface AsyncPartialCachingLookupProvider extends AsyncLookupFunctionProvider { } private final AsyncLookupFunction@Override asyncLookupFunction; private final LookupCache cache; public LookupFunction AsyncPartialCachingLookupProvider(AsyncLookupFunction asyncLookupFunction, LookupCache cachecreateLookupFunction() { this.asyncLookupFunction = asyncLookupFunction; return lookupFunction; this.cache = cache;} } @Override}; public} AsyncLookupFunction createAsyncLookupFunction() { /** Get a new instance of {@link return asyncLookupFunction; LookupCache}. */ LookupCache }getCache(); } |
FullCachingLookupProvider
This interface is for supporting full cache strategy. It reuses ScanRuntimeProvider and defines reload time.
PartialCachingAsyncLookupProvider
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Provider for creating {@link AsyncLookupFunction} and {@link LookupCache} for storing lookup entries.
*/
@PublicEvolving
public interface PartialCachingAsyncLookupProvider extends AsyncLookupFunctionProvider {
/**
* Build a {@link PartialCachingLookupProvider} from the specified {@link AsyncLookupFunction} and
* {@link LookupCache}.
*/
static PartialCachingLookupProvider of(AsyncLookupFunction asyncLookupFunction, LookupCache cache) {
return new PartialCachingAsyncLookupProvider() {
@Override
public LookupCache getCache() {
return cache;
}
@Override
public AsyncLookupFunction createAsyncLookupFunction() {
return asyncLookupFunction;
}
};
}
/** Get a new instance of {@link LookupCache}. */
LookupCache getCache();
} |
FullCachingLookupProvider
This interface is for supporting full cache strategy. It reuses ScanRuntimeProvider and defines reload time.
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* A {@link CachingLookupProvider} that never lookup in external system on cache miss and provides a
* cache for holding all entries in the external system. The cache will be fully reloaded from the
* external system by the {@link ScanTableSource.ScanRuntimeProvider} and reload operations will be
* triggered by the {@link CacheReloadTrigger}.
*/
@PublicEvolving
public interface FullCachingLookupProvider extends LookupFunctionProvider {
static FullCachingLookupProvider of(
ScanTableSource.ScanRuntimeProvider scanRuntimeProvider,
CacheReloadTrigger cacheReloadTrigger) {
return new FullCachingLookupProvider() {
@Override
public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider() {
return scanRuntimeProvider;
}
@Override
public CacheReloadTrigger getCacheReloadTrigger() {
return cacheReloadTrigger;
}
@Override
public LookupFunction createLookupFunction() {
return keyRow -> null;
} | ||||
Code Block | ||||
| ||||
/** * A {@link LookupFunctionProvider} that never lookup in external system on cache miss and provides a * cache for holding all entries in the external system. The cache will be fully reloaded from the * external system and reload operations will be triggered by the {@link CacheReloadTrigger}. */ @PublicEvolving public class FullCachingLookupProvider implements LookupFunctionProvider { private final ScanTableSource.ScanRuntimeProvider scanRuntimeProvider; private final CacheReloadTrigger reloadTrigger; public FullCachingLookupProvider( }; ScanTableSource.ScanRuntimeProvider scanRuntimeProvider,} /** * Get a CacheReloadTrigger reloadTrigger) { this.scanRuntimeProvider = scanRuntimeProvider; this.reloadTrigger = reloadTrigger; } @Override{@link ScanTableSource.ScanRuntimeProvider} for scanning all entries from the external * lookup table and load into the cache. */ ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(); public/** LookupFunctionGet createLookupFunction()a { @link CacheReloadTrigger} for triggering the reload return (keyRow) -> null; operation. */ CacheReloadTrigger }getCacheReloadTrigger(); } |
CacheReloadTrigger
A trigger defining custom logic for triggering full cache reloading.
Code Block | ||||
---|---|---|---|---|
| ||||
/** Customized trigger for reloading all lookup table entries in full caching mode. */ @PublicEvolving public interface CachingReloadTrigger extends AutoCloseable, Serializable { /** Open the trigger. */ void open(Context context) throws Exception; /** * Context of {@link CacheReloadTrigger} for getting information about times and * triggering reload. */ interface Context { /** Get current processing time. */ long currentProcessingTime(); /** Get current watermark on the main stream. */ long currentWatermark(); /** Trigger a reload operation on the full cache. */ CompletableFuture<Void> triggerReload(); } } |
...
An implementation of FullCachingReloadTrigger that triggers reload with a specified interval.
Code Block | ||||
---|---|---|---|---|
| ||||
/** A trigger that reloads all entries periodically with specified interval or delay. */ public class PeriodicCacheReloadTrigger implements CacheReloadTrigger { private final Duration reloadInterval; private final ScheduleMode scheduleMode; public PeriodicCacheReloadTrigger(Duration reloadInterval, ScheduleMode scheduleMode) { this.reloadInterval = reloadInterval; this.scheduleMode = scheduleMode; } @Override public void open(CacheReloadTrigger.Context context) { // Register periodic reload task } @Override public void close() throws Exception { // Dispose resources } public enum ScheduleMode { FIXED_DELAY, FIXED_RATE } } |
TimedCacheReloadTrigger
Code Block | ||||
---|---|---|---|---|
| ||||
/** A trigger that reloads at a specific local time and repeat for the given interval in days. */ public class TimedCacheReloadTrigger implements CacheReloadTrigger { private final LocalTime reloadTime; private final int reloadIntervalInDays; public TimedCacheReloadTrigger(LocalTime reloadTime, int reloadIntervalInDays) { this.reloadTime = reloadTime; this.reloadIntervalInDays = reloadIntervalInDays; } @Override public void open(Context context) { // Register periodic reload task } @Override public void close() throws Exception { // Dispose resources } } |
...
Option | Type | Descriptions | |
---|---|---|---|
lookup.cache | Enum of NONE, PARTIAL and FULL | The caching strategy for this lookup table. NONE: Do not use cache Partial: Use partial caching mode FULL: Use full caching mode | |
lookup.max-retries | Integer | The maximum allowed retries if a lookup operation fails | |
lookup.partial-cache.expire-after-access | Duration | Duration to expire an entry in the cache after accessing | |
lookup.partial-cache.expire-after-write | Duration | Duration to expire an entry in the cache after writing | |
lookup.partial-cache.cache-missing-key | Boolean | Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table | |
lookup.partial-cache.max-rows | Long | The maximum number of rows to store in the cache | |
lookup.full-cache.reload-strategy | Enum of PERIODIC and TIMED | The reload strategy for the full cache scenario. PERIODIC: Use PeriodicCacheReloadTrigger TIMED: Use TimedCacheReloadTrigger | |
lookup.full-cache.periodic-reload.interval | Duration | Duration to trigger reload in the PeriodicCacheReloadTrigger | |
lookup.full-cache.periodic-reload.schedule-mode | Enum of FIXED_DELAY and FIXED_RATE | The periodically schedule mode of reloading in the PeriodicCacheReloadTrigger | |
lookup.full-cache.timed-reload.iso-at-time | LocalTime | String | Time in ISO-8601 format when cache needs to be reloaded. Time can be specified either with timezone or without timezone (target JVM local timezone will be used). See formatter ISO_TIME. The local time on the target JVM to trigger a reload |
lookup.full-cache.timed-reload.interval-in-days | Integer | The interval in days to trigger the reload at the specified time |
...