Status
...
...
JIRA: TBD
...
Vote thread |
|
---|
JIRA | Jira |
---|
server | ASF JIRA |
---|
serverId | 5aa69414-a9e9-3523-82ec-879b028fb15b |
---|
key | FLINK-28415 |
---|
|
|
---|
Release | 1.16 |
---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
This FLIP is a joint work of initially proposed by Yuan Zhu (zstraw@163.com), and finished by Qingsheng Ren (partial caching part) and Alexander Smirnov (full caching part).
Motivation
...
Code Block |
---|
language | java |
---|
title | LookupFunctionLookupFunctionProvider |
---|
|
/** A provider for creating {@link LookupFunction}. */
@PublicEvolving
public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {
static LookupFunctionProvider of(LookupFunction lookupFunction) {
return () -> lookupFunction;
}
LookupFunction createLookupFunction();
} |
...
Code Block |
---|
language | java |
---|
title | LookupFunctionAsyncLookupFunctionProvider |
---|
|
/** A provider for creating {@link AsyncLookupFunction}. */
@PublicEvolving
public interface AsyncLookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {
static AsyncLookupFunctionProvider of(AsyncLookupFunction asyncLookupFunction) {
return () -> asyncLookupFunction;
}
AsyncLookupFunction createAsyncLookupFunction();
} |
...
Code Block |
---|
language | java |
---|
title | PartialCacheLookupCache |
---|
|
/**
* A semi-persistent mapping from keys to values for storing entries of lookup table.
*
* <p>The type of the caching key is a {@link RowData} with lookup key fields packed inside. The
* type of value is a {@link Collection} of {@link RowData}, which are rows matching lookup key
* fields.
*
* <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either
* evicted or manually invalidated.
*
* <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
* by multiple concurrent threads.
*/
@PublicEvolving
public interface LookupCache extends AutoClosable, Serializable {
/**
* Initialize the cache.
*
* @param metricGroup the metric group to register cache related metrics.
*/
void open(CacheMetricGroup metricGroup);
/**
* Returns the value associated with key in this cache, or null if there is no cached value for
* key.
*/
@Nullable
Collection<RowData> getIfPresent(RowData key);
/**
* Associates the specified value rows with the specified key row in the cache. If the cache
* previously contained value associated with the key, the old value is replaced by the
* specified value.
*
* @return the previous value rows associated with key, or null if there was no mapping for key.
* @param key - key row with which the specified value is to be associated
* @param value – value rows to be associated with the specified key
*/
Collection<RowData> put(RowData key, Collection<RowData> value);
/** Discards any cached value for the specified key. */
void invalidate(RowData key);
/** Returns the number of key-value mappings in the cache. */
long size();
} |
...
Code Block |
---|
language | java |
---|
title | DefaultPartialCacheDefaultLookupCache |
---|
|
/** Default implementation of {@link LookupCache}. */
@PublicEvolving
public class DefaultLookupCache implements LookupCache {
private final Duration expireAfterAccessDuration;
private final Duration expireAfterWriteDuration;
private final Long maximumSize;
private final boolean cacheMissingKey;
private DefaultLookupCache(
Duration expireAfterAccessDuration,
Duration expireAfterWriteDuration,
Long maximumSize,
boolean cacheMissingKey) {
this.expireAfterAccessDuration = expireAfterAccessDuration;
this.expireAfterWriteDuration = expireAfterWriteDuration;
this.initialCapacity = initialCapacity;
this.maximumSize = maximumSize;
this.cacheMissingKey = cacheMissingKey;
}
public static Builder newBuilder() {
return new Builder();
}
public static class Builder {
private Duration expireAfterAccessDuration;
private Duration expireAfterWriteDuration;
private Long maximumSize;
private Boolean cacheMissingKey;
public Builder expireAfterAccess(Duration duration) {
expireAfterAccessDuration = duration;
return this;
}
public Builder expireAfterWrite(Duration duration) {
expireAfterWriteDuration = duration;
return this;
}
public Builder maximumSize(long maximumSize) {
this.maximumSize = maximumSize;
return this;
}
public Builder cacheMissingKey(boolean cacheMissingKey) {
this.cacheMissingKey = cacheMissingKey;
return this;
}
public DefaultLookupCache build() {
return new DefaultLookupCache(
expireAfterAccessDuration,
expireAfterWriteDuration,
maximumSize,
cacheMissingKey);
}
}
} |
...
Code Block |
---|
language | java |
---|
title | LookupCacheMetricGroupCacheMetricGroup |
---|
|
/**
* Pre-defined metrics for cache.
*
* <p>Please note that these methods should only be invoked once. Registering a metric with same
* name for multiple times would lead to an undefined behavior.
*/
@PublicEvolving
public interface CacheMetricGroup extends MetricGroup {
/** The number of cache hits. */
void hitCounter(Counter hitCounter);
/** The number of cache misses. */
void missCounter(Counter missCounter);
/** The number of times to load data into cache from external system. */
void loadCounter(Counter loadCounter);
/** The number of load failures. */
void numLoadFailuresCounter(Counter numLoadFailuresCounter);
/** The time spent for the latest load operation. */
void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);
/** The number of records in cache. */
void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);
/** The number of bytes used by cache. */
void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
} |
...
Code Block |
---|
language | java |
---|
title | LookupFunctionProviderPartialCachingLookupProvider |
---|
|
/**
* Provider for creating {@link LookupFunction} and {@link LookupCache} for storing lookup entries.
*/
@PublicEvolving
public classinterface PartialCachingLookupProvider extends LookupFunctionProvider {
/**
private final LookupFunction lookupFunction;* Build a {@link PartialCachingLookupProvider} from the specified {@link LookupFunction} and
private * final{@link LookupCache cache;
}.
*/
publicstatic PartialCachingLookupProvider of(LookupFunction lookupFunction, LookupCache cache) {
this.lookupFunction = lookupFunction;return new PartialCachingLookupProvider() {
this.cache = cache; @Override
}
@Override
public LookupFunctionLookupCache createLookupFunctiongetCache() {
return lookupFunction;
}
public LookupCache getCache() {
return cache;
}
} |
AsyncPartialCachingLookupProvider
Code Block |
---|
language | java |
---|
title | LookupFunctionProvider |
---|
|
/**
* Provider for creating {@link AsyncLookupFunction} and {@link LookupCache} for storing lookup entries.
*/
@PublicEvolving
public class AsyncPartialCachingLookupProvider extends AsyncLookupFunctionProvider {
}
private final AsyncLookupFunction@Override
asyncLookupFunction;
private final LookupCache cache;
public AsyncPartialCachingLookupProvider(AsyncLookupFunction asyncLookupFunction, LookupCache cacheLookupFunction createLookupFunction() {
this.asyncLookupFunction = asyncLookupFunction;
this.cache = cachereturn lookupFunction;
}
@Override
public AsyncLookupFunction createAsyncLookupFunction() {}
return asyncLookupFunction};
}
public/** LookupCacheGet getCache() {
a new instance of {@link LookupCache}. */
return cache;
} LookupCache getCache();
} |
FullCachingLookupProvider
This interface is for supporting full cache strategy. It reuses ScanRuntimeProvider and defines reload time.
PartialCachingAsyncLookupProvider
Code Block |
---|
language | java |
---|
title | FullCachingLookupProviderPartialCachingAsyncLookupProvider |
---|
|
/**
* Provider for Acreating {@link LookupFunctionProviderAsyncLookupFunction} thatand never{@link lookupLookupCache} infor externalstoring systemlookup on cache miss and provides a
* cache for holding all entries in the external system. The cache will be fully reloaded from the
* external system and reload operations will be triggered by the {@link CacheReloadTrigger}.
*/
@PublicEvolving
public class FullCachingLookupProvider implements LookupFunctionProvider {
private final ScanTableSource.ScanRuntimeProvider scanRuntimeProvider;
private final CacheReloadTrigger reloadTrigger;entries.
*/
@PublicEvolving
public interface PartialCachingAsyncLookupProvider extends AsyncLookupFunctionProvider {
/**
* Build a {@link PartialCachingLookupProvider} from the specified {@link AsyncLookupFunction} and
* {@link LookupCache}.
*/
static PartialCachingLookupProvider of(AsyncLookupFunction asyncLookupFunction, LookupCache cache) {
return new PartialCachingAsyncLookupProvider() {
public FullCachingLookupProvider(
@Override
ScanTableSource.ScanRuntimeProvider scanRuntimeProvider,
public LookupCache getCache() {
CacheReloadTrigger reloadTrigger) {
this.scanRuntimeProvider =return scanRuntimeProvidercache;
this.reloadTrigger = reloadTrigger;
}
@Override
public LookupFunction createLookupFunction() {@Override
return (keyRow) -> null;
}
public public AsyncLookupFunction createAsyncLookupFunction() {
return asyncLookupFunction;
}
};
}
/** Get a new instance of {@link LookupCache}. */
LookupCache getCache();
} |
FullCachingLookupProvider
This interface is for supporting full cache strategy. It reuses ScanRuntimeProvider and defines reload time.
Code Block |
---|
language | java |
---|
title | FullCachingLookupProvider |
---|
|
/**
* A {@link CachingLookupProvider} that never lookup in external system on cache miss and provides a
* cache for holding all entries in the external system. The cache will be fully reloaded from the
* external system by the {@link ScanTableSource.ScanRuntimeProvider} and reload operations will be
* triggered by the {@link CacheReloadTrigger}.
*/
@PublicEvolving
public interface FullCachingLookupProvider extends LookupFunctionProvider {
static FullCachingLookupProvider of(
ScanTableSource.ScanRuntimeProvider scanRuntimeProvider,
CacheReloadTrigger cacheReloadTrigger) {
return new FullCachingLookupProvider() {
@Override
public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider() {
return scanRuntimeProvider;
}
@Override
public CacheReloadTrigger getCacheReloadTrigger() {
return cacheReloadTrigger;
}
@Override
public LookupFunction createLookupFunction() {
return keyRow -> null;
}
};
}
/**
* Get a {@link ScanTableSource.ScanRuntimeProvider} for scanning all entries from the external
* lookup table and load into the cache.
*/
ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();
/** Get {
return scanRuntimeProvider;
}
publica {@link CacheReloadTrigger} for triggering the reload operation. */
CacheReloadTrigger getCacheReloadTrigger() {
return reloadTrigger);
}
} |
CacheReloadTrigger
A trigger defining custom logic for triggering full cache reloading.
Code Block |
---|
language | java |
---|
title | FullCachingReloadTriggerCacheReloadTrigger |
---|
|
/** Customized trigger for reloading all lookup table entries in full caching mode. */
@PublicEvolving
public interface CachingReloadTrigger extends AutoCloseable, Serializable {
/** Open the trigger. */
void open(Context context) throws Exception;
/**
* Context of {@link CacheReloadTrigger} for getting information about times and
* triggering reload.
*/
interface Context {
/** Get current processing time. */
long currentProcessingTime();
/** Get current watermark on the main stream. */
long currentWatermark();
/** Trigger a reload operation on the full cache. */
CompletableFuture<Void> triggerReload();
}
}
|
...
An implementation of FullCachingReloadTrigger that triggers reload with a specified interval.
Code Block |
---|
language | java |
---|
title | PeriodicCacheReloadTrigger |
---|
|
/** A trigger that reloads all entries periodically with specified interval or delay. */
public class PeriodicCacheReloadTrigger implements CacheReloadTrigger {
private final Duration reloadInterval;
private final ScheduleMode scheduleMode;
public PeriodicCacheReloadTrigger(Duration reloadInterval, ScheduleMode scheduleMode) {
this.reloadInterval = reloadInterval;
this.scheduleMode = scheduleMode;
}
@Override
public void open(CacheReloadTrigger.Context context) {
// Register periodic reload task
}
@Override
public void close() throws Exception {
// Dispose resources
}
public enum ScheduleMode {
FIXED_DELAY,
FIXED_RATE
}
} |
TimedCacheReloadTrigger
Code Block |
---|
language | java |
---|
title | TimedCacheReloadTrigger |
---|
|
/** A trigger that reloads at a specific local time and repeat for the given interval in days. */
public class TimedCacheReloadTrigger implements CacheReloadTrigger {
private final LocalTime reloadTime;
private final int reloadIntervalInDays;
public TimedCacheReloadTrigger(LocalTime reloadTime, int reloadIntervalInDays) {
this.reloadTime = reloadTime;
this.reloadIntervalInDays = reloadIntervalInDays;
}
@Override
public void open(Context context) {
// Register periodic reload task
}
@Override
public void close() throws Exception {
// Dispose resources
}
} |
...
Option | Type | Descriptions |
---|
lookup.cache | Enum of NONE, PARTIAL and FULL | The caching strategy for this lookup table. NONE: Do not use cache Partial: Use partial caching mode FULL: Use full caching mode |
lookup.max-retries | Integer | The maximum allowed retries if a lookup operation fails |
lookup.partial-cache.expire-after-access | Duration | Duration to expire an entry in the cache after accessing |
lookup.partial-cache.expire-after-write | Duration | Duration to expire an entry in the cache after writing |
lookup.partial-cache.cache-missing-key | Boolean | Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table |
lookup.partial-cache.max-rows | Long | The maximum number of rows to store in the cache |
lookup.full-cache.reload-strategy | Enum of PERIODIC and TIMED | The reload strategy for the full cache scenario. PERIODIC: Use PeriodicCacheReloadTrigger TIMED: Use TimedCacheReloadTrigger |
lookup.full-cache.periodic-reload.interval | Duration | Duration to trigger reload in the PeriodicCacheReloadTrigger |
lookup.full-cache.periodic-reload.schedule-mode | Enum of FIXED_DELAY and FIXED_RATE | The periodically schedule mode of reloading in the PeriodicCacheReloadTrigger |
lookup.full-cache.timed-reload.iso-at-time | LocalTime | String | Time in ISO-8601 format when cache needs to be reloaded. Time can be specified either with timezone or without timezone (target JVM local timezone will be used). See formatter ISO_TIME. The local time on the target JVM to trigger a reload |
lookup.full-cache.timed-reload.interval-in-days | Integer | The interval in days to trigger the reload at the specified time |
...