Status
...
Page properties | |
---|---|
|
...
JIRA: TBD
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
This FLIP is a joint work of initially proposed by Yuan Zhu (zstraw@163.com), and finished by Qingsheng Ren (partial caching part) and Alexander Smirnov (full caching part).
Table of Contents |
---|
Motivation
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** A provider for creating {@link LookupFunction}. */ @PublicEvolving public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider { static LookupFunctionProvider of(LookupFunction lookupFunction) { return () -> lookupFunction; } LookupFunction createLookupFunction(); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** A provider for creating {@link AsyncLookupFunction}. */ @PublicEvolving public interface AsyncLookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider { static AsyncLookupFunctionProvider of(AsyncLookupFunction asyncLookupFunction) { return () -> asyncLookupFunction; } AsyncLookupFunction createAsyncLookupFunction(); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* A semi-persistent mapping from keys to values for storing entries of lookup table.
*
* <p>The type of the caching key is a {@link RowData} with lookup key fields packed inside. The
* type of value is a {@link Collection} of {@link RowData}, which are rows matching lookup key
* fields.
*
* <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either
* evicted or manually invalidated.
*
* <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
* by multiple concurrent threads.
*/
@PublicEvolving
public interface LookupCache extends AutoClosable, Serializable {
/**
* Initialize the cache.
*
* @param metricGroup the metric group to register cache related metrics.
*/
void open(CacheMetricGroup metricGroup);
/**
* Returns the value associated with key in this cache, or null if there is no cached value for
* key.
*/
@Nullable
Collection<RowData> getIfPresent(RowData key);
/**
* Associates the specified value rows with the specified key row in the cache. If the cache
* previously contained value associated with the key, the old value is replaced by the
* specified value.
*
* @return the previous value rows associated with key, or null if there was no mapping for key.
* @param key - key row with which the specified value is to be associated
* @param value – value rows to be associated with the specified key
*/
Collection<RowData> put(RowData key, Collection<RowData> value);
/** Discards any cached value for the specified key. */
void invalidate(RowData key);
/** Returns the number of key-value mappings in the cache. */
long size();
} |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** Default implementation of {@link LookupCache}. */ @PublicEvolving public class DefaultLookupCache implements LookupCache { private final Duration expireAfterAccessDuration; private final Duration expireAfterWriteDuration; private final Long maximumSize; private final boolean cacheMissingKey; private DefaultLookupCache( Duration expireAfterAccessDuration, Duration expireAfterWriteDuration, Long maximumSize, boolean cacheMissingKey) { this.expireAfterAccessDuration = expireAfterAccessDuration; this.expireAfterWriteDuration = expireAfterWriteDuration; this.initialCapacity = initialCapacity; this.maximumSize = maximumSize; this.cacheMissingKey = cacheMissingKey; } public static Builder newBuilder() { return new Builder(); } public static class Builder { private Duration expireAfterAccessDuration; private Duration expireAfterWriteDuration; private Long maximumSize; private Boolean cacheMissingKey; public Builder expireAfterAccess(Duration duration) { expireAfterAccessDuration = duration; return this; } public Builder expireAfterWrite(Duration duration) { expireAfterWriteDuration = duration; return this; } public Builder maximumSize(long maximumSize) { this.maximumSize = maximumSize; return this; } public Builder cacheMissingKey(boolean cacheMissingKey) { this.cacheMissingKey = cacheMissingKey; return this; } public DefaultLookupCache build() { return new DefaultLookupCache( expireAfterAccessDuration, expireAfterWriteDuration, maximumSize, cacheMissingKey); } } } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Pre-defined metrics for cache. * * <p>Please note that these methods should only be invoked once. Registering a metric with same * name for multiple times would lead to an undefined behavior. */ @PublicEvolving public interface CacheMetricGroup extends MetricGroup { /** The number of cache hits. */ void hitCounter(Counter hitCounter); /** The number of cache misses. */ void missCounter(Counter missCounter); /** The number of times to load data into cache from external system. */ void loadCounter(Counter loadCounter); /** The number of load failures. */ void numLoadFailuresCounter(Counter numLoadFailuresCounter); /** The time spent for the latest load operation. */ void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge); /** The number of records in cache. */ void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge); /** The number of bytes used by cache. */ void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Provider for creating {@link LookupFunction} and {@link LookupCache} for storing lookup entries. */ @PublicEvolving public interface PartialCachingLookupProvider extends LookupFunctionProvider { /** * Build a {@link PartialCachingLookupProvider} from the specified {@link LookupFunction} and * {@link LookupCache}. */ static PartialCachingLookupProvider of(LookupFunction lookupFunction, LookupCache cache) { return new PartialCachingLookupProvider() { @Override public LookupCache getCache() { return cache; } @Override public LookupFunction createLookupFunction() { return lookupFunction; } }; } /** Get a new instance of {@link LookupCache}. */ LookupCache getCache(); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Provider for creating {@link AsyncLookupFunction} and {@link LookupCache} for storing lookup entries. */ @PublicEvolving public interface PartialCachingAsyncLookupProvider extends AsyncLookupFunctionProvider { /** * Build a {@link PartialCachingLookupProvider} from the specified {@link AsyncLookupFunction} and * {@link LookupCache}. */ static PartialCachingLookupProvider of(AsyncLookupFunction asyncLookupFunction, LookupCache cache) { return new PartialCachingAsyncLookupProvider() { @Override public LookupCache getCache() { return cache; } @Override public AsyncLookupFunction createAsyncLookupFunction() { return asyncLookupFunction; } }; } /** Get a new instance of {@link LookupCache}. */ LookupCache getCache(); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** Customized trigger for reloading all lookup table entries in full caching mode. */ @PublicEvolving public interface CachingReloadTrigger extends AutoCloseable, Serializable { /** Open the trigger. */ void open(Context context) throws Exception; /** * Context of {@link CacheReloadTrigger} for getting information about times and * triggering reload. */ interface Context { /** Get current processing time. */ long currentProcessingTime(); /** Get current watermark on the main stream. */ long currentWatermark(); /** Trigger a reload operation on the full cache. */ CompletableFuture<Void> triggerReload(); } } |
...
An implementation of FullCachingReloadTrigger that triggers reload with a specified interval.
Code Block | ||||
---|---|---|---|---|
| ||||
/** A trigger that reloads all entries periodically with specified interval or delay. */ public class PeriodicCacheReloadTrigger implements CacheReloadTrigger { private final Duration reloadInterval; private final ScheduleMode scheduleMode; public PeriodicCacheReloadTrigger(Duration reloadInterval, ScheduleMode scheduleMode) { this.reloadInterval = reloadInterval; this.scheduleMode = scheduleMode; } @Override public void open(CacheReloadTrigger.Context context) { // Register periodic reload task } @Override public void close() throws Exception { // Dispose resources } public enum ScheduleMode { FIXED_DELAY, FIXED_RATE } } |
TimedCacheReloadTrigger
Code Block | ||||
---|---|---|---|---|
| ||||
/** A trigger that reloads at a specific local time and repeat for the given interval in days. */ public class TimedCacheReloadTrigger implements CacheReloadTrigger { private final LocalTime reloadTime; private final int reloadIntervalInDays; public TimedCacheReloadTrigger(LocalTime reloadTime, int reloadIntervalInDays) { this.reloadTime = reloadTime; this.reloadIntervalInDays = reloadIntervalInDays; } @Override public void open(Context context) { // Register periodic reload task } @Override public void close() throws Exception { // Dispose resources } } |
...
Option | Type | Descriptions | |
---|---|---|---|
lookup.cache | Enum of NONE, PARTIAL and FULL | The caching strategy for this lookup table. NONE: Do not use cache Partial: Use partial caching mode FULL: Use full caching mode | |
lookup.max-retries | Integer | The maximum allowed retries if a lookup operation fails | |
lookup.partial-cache.expire-after-access | Duration | Duration to expire an entry in the cache after accessing | |
lookup.partial-cache.expire-after-write | Duration | Duration to expire an entry in the cache after writing | |
lookup.partial-cache.cache-missing-key | Boolean | Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table | |
lookup.partial-cache.max-rows | Long | The maximum number of rows to store in the cache | |
lookup.full-cache.reload-strategy | Enum of PERIODIC and TIMED | The reload strategy for the full cache scenario. PERIODIC: Use PeriodicCacheReloadTrigger TIMED: Use TimedCacheReloadTrigger | |
lookup.full-cache.periodic-reload.interval | Duration | Duration to trigger reload in the PeriodicCacheReloadTrigger | |
lookup.full-cache.periodic-reload.schedule-mode | Enum of FIXED_DELAY and FIXED_RATE | The periodically schedule mode of reloading in the PeriodicCacheReloadTrigger | |
lookup.full-cache.timed-reload.iso-at-time | LocalTime | String | Time in ISO-8601 format when cache needs to be reloaded. Time can be specified either with timezone or without timezone (target JVM local timezone will be used). See formatter ISO_TIME. The local time on the target JVM to trigger a reload |
lookup.full-cache.timed-reload.interval-in-days | Integer | The interval in days to trigger the reload at the specified time |
...