Status
...
...
...
JIRA: TBD
...
| Vote thread |
|
---|
JIRA | Jira |
---|
server | ASF JIRA |
---|
serverId | 5aa69414-a9e9-3523-82ec-879b028fb15b |
---|
key | FLINK-28415 |
---|
|
|
---|
Release | 1.16 |
---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
This FLIP is a joint work of initially proposed by Yuan Zhu (zstraw@163.com), and finished by Qingsheng Ren (partial caching part) and Alexander Smirnov (full caching part).
Motivation
...
In order to address the issues above, we propose here to define a unified abstraction for lookup source cache and its related metrics.
Proposed Changes
We'd like to split the proposal into two kinds of caching mode: partial caching and full caching.
Partial caching
Partial caching loads data into the cache along with the access to the external system. If the key to lookup does not exist in the cache, a lookup action to the external system will be triggered and the lookup result will be stored in the cache for further loading. Users and lookup table developers are able to config the eviction policy and maximum size of the cache.
In order to support partial caching, we propose to introduce several new interfaces to simplify the work for developers to implement lookup table functions and enable cache:
Top-level APIs
In order to clarify the semantic of lookup, we'd like to introduce some top-level APIs for general lookup operations without caching:
LookupFunction
/ AsyncLookupFunction
, an extended version of TableFunction to make the API more straight forward.LookupFunctionProvider
/ AsyncLookupProvider
, serve as the creator of LookupFunction / AsyncLookupFunction in table source
And APIs related to the cache:
LookupCache
, defining the cache used in lookup table.DefaultLookupCache
a default implementation of a cache that suitable for most use cases.CacheMetricGroup
, defining metrics should be reported by the cache.
Partial and Full Caching
More specifically, we'd like to provide public interfaces for the most 2 common cases to lookup source developers, which are named as partial and full caching.
Partial caching
Partial caching loads data into the cache along with the access to the external system. If the key to lookup does not exist in the cache, a lookup action to the external system will be triggered and the lookup result will be stored in the cache for further loading. Users and lookup table developers are able to config the eviction policy and maximum size of the cache.
In order to support partial caching, we propose to introduce 2 new interfaces:
PartialCachingLookupProvider
/ AsyncPartialCachingLookupProvider
LookupFunction
/ AsyncLookupFunction
, an extended version of TableFunction to clarify the semantic of lookup.LookupCache
/ LookupCacheFactory
, defining the cache and its factory used in lookup table.DefaultLookupCacheFactory
, a default implementation of a cache that suitable for most use cases.LookupCacheMetricGroup
, defining metrics should be reported by the lookup cache.LookupFunctionProvider
/ AsyncLookupFunctionProvider
, as the API interacting with table source to get LookupFunction and LookupCacheFactoryLookupCache.
The cache serves as a component in LookupJoinRunner, and would be pluggable by specifying LookupCacheFactory in LookupFunctionProvider. The developer of a lookup table needs to define a LookupFunctionProvider / AsyncLookupProvider in their implementation of LookupTableSource to specify the LookupFunction and the factory of the cache, then the planner will take over the cache factory, the constructor of the provider. The planner will take over the lookup function and the cache created from the provider and pass it to the LookupJoinRunner, and the . The cache will be instantiated during the runtime execution and loading operations via lookup function if there's a cache miss.
Full Caching
If the size of lookup table is relatively small to fit into the memory, and the lookup table doesn't change frequently, it'll be more efficient to load all entries of the lookup table into the cache to reduce network I/O, and refresh the table periodically or at specified time of day. We'd like to name this use case as "full cache". Logically the reload operation is a kind of scan, so we'd like to reuse the ScanRuntimeProvider so that developers could reuse the scanning logic implemented in Source / SourceFunction / InputFormat. Considering the complexity of Source API, we'd like to support SourceFunction and InputFormat API first. Supporting Source API might require new topology and will be discussed later in another FLIP.
We propose to introduce a several new interface FullCachingLookupProvider
in order to reuse interfaces:
FullCachingLookupProvider
, for reusing the ability of scanning.
Public Interfaces
Lookup Functions
CacheReloadTrigger
, for customizing reloading strategies of all entries in the full cache.
Also we'd like to provide two default implementations of CacheReloadTrigger:
PeriodicCacheReloadTrigger
, for triggering reload periodically with a specific interval- TimedCacheReloadTrigger, for triggering reload at the specific time and repeat with the interval in days.
Public Interfaces
Lookup Functions
As the usage of TableFunction interface is not quite straight forward to lookup table developers, we'd like to introduce a new interface for sync and async lookup tables. Caching will be only supported on LookupFunction / AsyncLookupFunction.
Code Block |
---|
language | java |
---|
title | LookupFunction |
---|
|
/**
* A wrapper class of {@link TableFunction} for synchronously lookup rows matching the lookup keys
* from external system.
*
* <p>The output type of this table function is fixed as {@link RowData}.
*/
@PublicEvolving
public abstract class LookupFunction extends TableFunction<RowData> {
/**
* Synchronously lookup rows matching the lookup keys.
*
* @param keyRow - A {@link RowData} that wraps keys to lookup.
* @return A collections of all matching rows in the lookup table.
*/
public abstract Collection<RowData> lookup(RowData keyRow) throws IOException;
/** Invoke {@link #lookup} and handle exceptions. */
public final void eval(Object... keys) {
try {
lookup(GenericRowData.of(keys)).forEach(this::collect);
} catch (IOException e) {
throw new RuntimeException("Failed to lookup values with given key", e);
}
}
} |
Code Block |
---|
language | java |
---|
title | AsyncLookupFunction |
---|
|
/**
* A wrapper class of {@link AsyncTableFunction} for asynchronously lookup rows matching the lookup
* keys from external system.
*
* <p>The output type of this table function is fixed as {@link RowData}.
*/
@PublicEvolving
public abstract class AsyncLookupFunction extends AsyncTableFunction<RowData> {
/**
* Asynchronously lookup rows matching the lookup keys.
*
* @param keyRow - A {@link RowData} that wraps keys to lookup.
* @return A collections of all matching rows in the lookup table.
*/
public abstract CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow);
/** Invokes {@link #asyncLookup} and chains futures. */
public final void eval(CompletableFuture<Collection<RowData>> future, Object... keys) {
asyncLookup(GenericRowData.of(keys))
.whenCompleteAsync(
(result, exception) -> {
if (exception != null) {
future.completeExceptionally(exception);
return;
}
future.complete(result);
});
}
} |
LookupCache
Considering there might be custom caching strategies and optimizations, we'd like to expose the cache interface as public API for developers to make the cache pluggable.
Code Block |
---|
language | java |
---|
title | LookupCache |
---|
|
/**
* A semi-persistent mapping from keys to values for storing entries of lookup table.
*
* <p>The type of the caching key is a {@link RowData} with lookup key fields packed inside. The
* type of value is a {@link Collection} of {@link RowData}, which are rows matching lookup key
* fields.
*
* <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either
* evicted or manually invalidated.
*
* <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
* by multiple concurrent threads.
*/
@PublicEvolving
public interface LookupCache {
/**
* Returns the value associated with key in this cache, or null if there is no cached value for
* key.
*/
@Nullable
Collection<RowData> getIfPresent(RowData key);
/**
* Associates the specified value rows with the specified key row in the cache. If the cache
* previously contained value associated with the key, the old value is replaced by the
* specified value.
*
* @return the previous value rows associated with key, or null if there was no mapping for key.
* @param key - key row with which the specified value is to be associated
* @param value – value rows to be associated with the specified key
*/
Collection<RowData> put(RowData key, Collection<RowData> value);
/** Discards any cached value for the specified key. */
void invalidate(RowData key);
/** Returns the number of key-value mappings in the cache. */
long size();
} |
LookupCacheFactory
As the cache should be instantiated during runtime execution to avoid serialization / deserialization, a factory is required for creating the cache.
Code Block |
---|
language | java |
---|
title | LookupCacheFactory |
---|
|
/** Factory for creating an instance of {@link LookupCache}. */
@PublicEvolving
public interface LookupCacheFactory extends Serializable/**
* A wrapper class of {@link TableFunction} for synchronously lookup rows matching the lookup keys
* from external system.
*
* <p>The output type of this table function is fixed as {@link RowData}.
*/
@PublicEvolving
public abstract class LookupFunction extends TableFunction<RowData> {
/**
* CreateSynchronously alookup {@link LookupCache}.
*rows matching the lookup keys.
*
@param metricGroup - The lookup cache metric group in which the cache register predefined and
* custom metrics * @param keyRow - A {@link RowData} that wraps keys to lookup.
*/
LookupCache createCache(LookupCacheMetricGroup metricGroup);
} |
DefaultLookupCacheFactory
In order to simplify the usage of developer, we provide a default factory for building a default cache.
Code Block |
---|
language | java |
---|
title | DefaultLookupCacheFactory |
---|
|
/** Factory for creating instance of {@link DefaultLookupCache}. */
@PublicEvolving
public class DefaultLookupCacheFactory implements LookupCacheFactory {
private final Duration expireAfterAccessDuration;
private final Duration expireAfterWriteDuration;
private final Integer initialCapacity;
private final Long maximumSize;
public static DefaultLookupCacheFactory.Builder newBuilder() {
@return A collections of all matching rows in the lookup table.
*/
public abstract Collection<RowData> lookup(RowData keyRow) throws IOException;
/** Invoke {@link #lookup} and handle exceptions. */
public final void eval(Object... keys) {
try {
return new Builder( lookup(GenericRowData.of(keys)).forEach(this::collect);
}
} privatecatch DefaultLookupCacheFactory(
IOException e) {
Duration expireAfterAccessDuration,
throw new RuntimeException("Failed to lookup values with given Duration expireAfterWriteDuration,key", e);
}
}
} |
Code Block |
---|
language | java |
---|
title | AsyncLookupFunction |
---|
|
/**
* A wrapper class of {@link AsyncTableFunction} for asynchronously lookup rows matching the lookup
* keys from external system.
*
* <p>The output type of this table function is fixed as {@link RowData}.
*/
@PublicEvolving
public abstract class AsyncLookupFunction extends AsyncTableFunction<RowData> {
/**Long maximumSize) {
this.expireAfterAccessDuration = expireAfterAccessDuration;
this.expireAfterWriteDuration = expireAfterWriteDuration;
this.initialCapacity = initialCapacity;
this.maximumSize = maximumSize;
}
@Override
public LookupCache createCache(LookupCacheMetricGroup metricGroup) {
* Asynchronously lookup //rows Creatematching instancethe oflookup DefaultLookupCachekeys.
} *
/** Builder of @param keyRow - A {@link DefaultLookupCacheFactoryRowData}. */
that wraps keys public static class Builder {to lookup.
* @return A privatecollections Durationof expireAfterAccessDuration;
all matching rows in the lookup table.
private Duration expireAfterWriteDuration;
*/
public abstract privateCompletableFuture<Collection<RowData>> Integer initialCapacityasyncLookup(RowData keyRow);
/** Invokes {@link #asyncLookup} and privatechains Long maximumSize;
futures. */
public final void eval(CompletableFuture<Collection<RowData>> public DefaultLookupCacheFactory.Builder expireAfterAccess(Duration durationfuture, Object... keys) {
asyncLookup(GenericRowData.of(keys))
expireAfterAccessDuration = duration;
.whenCompleteAsync(
return this;
}
public DefaultLookupCacheFactory.Builder expireAfterWrite(Durationresult, durationexception) -> {
expireAfterWriteDuration = duration;
if (exception != returnnull) this;{
}
public DefaultLookupCacheFactory.Builder maximumSize(long maximumSize) {
this.maximumSize = maximumSizefuture.completeExceptionally(exception);
return this;
}
public DefaultLookupCacheFactory build() {return;
return new DefaultLookupCacheFactory(
}
expireAfterAccessDuration,
expireAfterWriteDuration,
future.complete(result);
maximumSize);
});
}
} |
LookupCacheMetricGroup
...
LookupFunctionProvider
Code Block |
---|
language | java |
---|
title | CacheMetricGroupLookupFunctionProvider |
---|
|
/**
* Pre-definedA metricsprovider for creating {@code@link LookupCacheLookupFunction}. */
*
* <p>Please note that these methods should only be invoked once. Registering a metric with same
* name for multiple times would lead to an undefined behavior.
@PublicEvolving
public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {
static LookupFunctionProvider of(LookupFunction lookupFunction) {
return () -> lookupFunction;
}
LookupFunction createLookupFunction();
} |
AsyncLookupFunctionProvider
Code Block |
---|
language | java |
---|
title | AsyncLookupFunctionProvider |
---|
|
/** A provider for creating {@link AsyncLookupFunction}. */
@PublicEvolving
public interface LookupCacheMetricGroupAsyncLookupFunctionProvider extends MetricGroup {
/** The number of cache hits. */ LookupTableSource.LookupRuntimeProvider {
voidstatic AsyncLookupFunctionProvider hitCounterof(CounterAsyncLookupFunction hitCounterasyncLookupFunction); {
/** The number ofreturn cache misses. */() -> asyncLookupFunction;
void missCounter(Counter missCounter);}
/** The number of times to load data into cache from external system. */
void loadCounter(Counter loadCounter);
/** The number of load failures. */
void numLoadFailuresCounter(Counter numLoadFailuresCounter);
/** The time spent for the latest load operation. */
void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);
/** The number of records in cache. */
void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);
/** The number of bytes used by cache. */
void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
} |
LookupFunctionProvider
This is the API between table framework and user's table source. Implementation should define how to create a lookup function and whether to use cache.
AsyncLookupFunction createAsyncLookupFunction();
} |
LookupCache
Considering there might be custom caching strategies and optimizations, we'd like to expose the cache interface as public API for developers to make the cache pluggable.
Code Block |
---|
language | java |
---|
title | LookupCache |
---|
|
/**
* A semi-persistent mapping from keys to values for storing entries of lookup table.
*
* <p>The type of the caching key is a {@link RowData} with lookup key fields packed inside. The
* type of value is a {@link Collection} of {@link RowData}, which are rows matching lookup key
* fields.
*
* <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either
* evicted or manually invalidated.
*
* <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
* by multiple concurrent threads |
Code Block |
---|
language | java |
---|
title | LookupFunctionProvider |
---|
|
/**
* Provider for creating {@link LookupFunction} and {@link LookupCacheFactory} if caching should be
* enabled for the lookup table.
*/
@PublicEvolving
public interface LookupFunctionProviderLookupCache extends LookupTableSource.LookupRuntimeProvider {
AutoClosable, Serializable {
/**
* Initialize the cache.
*
/** Creates a builder* of@param {@link LookupFunctionProvider}. */
static Builder newBuilder() {metricGroup the metric group to register cache related metrics.
*/
return newvoid Builderopen(CacheMetricGroup metricGroup);
}
/**
Creates an {@link LookupFunction} instance. */
Returns the LookupFunction createLookupFunction();
/**
* Gets the {@link LookupCacheFactory} for creating lookup cachevalue associated with key in this cache, or null if there is no cached value for
* key.
*/
@Nullable
* <p>This factory willCollection<RowData> be used for creating an instance of cache during runtime execution for
* optimizing the access to external lookup table.
*getIfPresent(RowData key);
/**
* Associates the specified value rows with the specified key row in the cache. If the cache
* @returnpreviously ancontained {@linkvalue Optional}associated ofwith {@linkthe LookupCacheFactory}key, orthe old anvalue emptyis {@linkreplaced Optional}by ifthe
* caching shouldn't be applies to the lookup tablespecified value.
*/
Optional<LookupCacheFactory> getCacheFactory();
/**
* Whether the missing key (key fields without any matching value rows) should be stored in the
* cache.
* * @return the previous value rows associated with key, or null if there was no mapping for key.
* @param key - key row with which the specified value is to be associated
* <p>Please@param notevalue that– thisvalue optionrows isto requiredbe ifassociated {@link #getCacheFactory()} returns a non-emptywith the specified key
*/
instance. If the cacheCollection<RowData> factory is emptyput(RowData key, the return value of this function will be ignored.
*Collection<RowData> value);
/** Discards any cached value for the specified key. */
void * @return true if a null or empty value should be storedinvalidate(RowData key);
/** Returns the number of key-value mappings in the cache.
*/
Optional<Boolean>long cacheMissingKeysize();
} |
DefaultLookupCache
As the cache should be instantiated during runtime execution to avoid serialization / deserialization, a factory is required for creating the cache.
Code Block |
---|
language | java |
---|
title | DefaultLookupCache |
---|
|
/** BuilderDefault classimplementation forof {@link LookupFunctionProviderLookupCache}. */
@PublicEvolving
public class DefaultLookupCache implements classLookupCache Builder {
private final private LookupFunction lookupFunctionDuration expireAfterAccessDuration;
private final Duration expireAfterWriteDuration;
private LookupCacheFactory cacheFactory;
private final Long maximumSize;
private Booleanfinal boolean enableCacheMissingKeycacheMissingKey;
/** Sets lookup function. */
private DefaultLookupCache(
public Builder withLookupFunction(LookupFunction lookupFunction) {Duration expireAfterAccessDuration,
this.lookupFunction = lookupFunction;Duration expireAfterWriteDuration,
returnLong this;
maximumSize,
boolean cacheMissingKey) {
}
this.expireAfterAccessDuration = expireAfterAccessDuration;
/** Enables caching and sets the cache factorythis. */expireAfterWriteDuration = expireAfterWriteDuration;
publicthis.initialCapacity Builder withCacheFactory(LookupCacheFactory cacheFactory) {= initialCapacity;
this.maximumSize = maximumSize;
this.cacheFactorycacheMissingKey = cacheFactorycacheMissingKey;
}
public static Builder return this;newBuilder() {
}
return new Builder();
} /**
public static class Builder { * Enables
storing missing key in the cache.
private Duration expireAfterAccessDuration;
*
private Duration expireAfterWriteDuration;
* <p>See {@link LookupFunctionProvider#cacheMissingKey()} forprivate moreLong details.maximumSize;
private Boolean */cacheMissingKey;
public Builder enableCacheMissingKeyexpireAfterAccess(Duration duration) {
this.enableCacheMissingKeyexpireAfterAccessDuration = trueduration;
return this;;
}
public Builder expireAfterWrite(Duration duration) {
}
expireAfterWriteDuration = duration;
public LookupFunctionProvider build() {
return this;
// Build LookupFunctionProvider}
}
public Builder maximumSize(long maximumSize) }
} |
AsyncLookupFunctionProvider
Code Block |
---|
language | java |
---|
title | AsyncLookupFunctionProvider |
---|
|
@PublicEvolving
public interface AsyncLookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {
/** Creates a builder of {@link LookupFunctionProvider}. */{
this.maximumSize = maximumSize;
static Builder newBuilder() {
return this;
return new Builder();
}
/** Creates an {@link AsyncLookupFunction} instance. */
public Builder cacheMissingKey(boolean AsyncLookupFunction createAsyncLookupFunction();
cacheMissingKey) {
/**
* Gets the {@link LookupCacheFactory} for creating lookup cache.
this.cacheMissingKey = cacheMissingKey;
*
* <p>This factory will be used for creating an instance of cache during runtime execution forreturn this;
}
* optimizing the accesspublic toDefaultLookupCache external lookup table.build() {
*
* @return anreturn {@link Optional} of {@link LookupCacheFactory}, or an empty {@link Optional} if
new DefaultLookupCache(
* caching shouldn't be applies to the lookup table.
expireAfterAccessDuration,
*/
Optional<LookupCacheFactory> getCacheFactory();
/**expireAfterWriteDuration,
* Whether the missing key (key fields without any matching value rows) should be stored in the
maximumSize,
cacheMissingKey);
* cache.}
}
} |
CacheMetricGroup
An interface defining all cache related metric:
Code Block |
---|
language | java |
---|
title | CacheMetricGroup |
---|
|
/**
* Pre-defined metrics for cache.
*
* <p>Please note that thisthese optionmethods isshould requiredonly ifbe {@link #getCacheFactory()} returnsinvoked once. Registering a non-empty
metric with same
* instance.name Iffor themultiple cachetimes factorywould islead empty,to thean return value of this function will be ignoredundefined behavior.
*
* @return true if a null or empty value should be stored in the cache.
/
@PublicEvolving
public interface CacheMetricGroup extends MetricGroup {
/** The number of cache hits. */
Optional<Boolean>void cacheMissingKeyhitCounter(Counter hitCounter);
/** BuilderThe classnumber forof {@linkcache AsyncLookupFunctionProvider}misses. */
classvoid Builder {missCounter(Counter missCounter);
/** The number of privatetimes AsyncLookupFunctionto asyncLookupFunction;
load private LookupCacheFactory cacheFactory;
data into cache from external system. */
privatevoid Boolean enableCacheMissingKeyloadCounter(Counter loadCounter);
/** The Setsnumber of lookupload functionfailures. */
void numLoadFailuresCounter(Counter numLoadFailuresCounter);
public Builder withAsyncLookupFunction(AsyncLookupFunction asyncLookupFunction) {
/** The time spent for the latest load operation. */
void this.asyncLookupFunction = asyncLookupFunction;
latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);
/** The number of records in cache. */
void return thisnumCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);
/** The number of }
bytes used by cache. */
void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
} |
PartialCachingLookupProvider
This is the API between table framework and user's table source. Implementation should define how to create a lookup function and whether to use cache.
Code Block |
---|
language | java |
---|
title | PartialCachingLookupProvider |
---|
|
/**
* Provider for creating {@link LookupFunction} and {@link LookupCache} for storing lookup entries.
*/
@PublicEvolving
public interface PartialCachingLookupProvider extends LookupFunctionProvider {
/**
* Build a {@link PartialCachingLookupProvider} from the specified {@link LookupFunction} and
* {@link LookupCache}.
*/
static PartialCachingLookupProvider of(LookupFunction lookupFunction, LookupCache cache) {/** Enables caching and sets the cache factory. */
public Builder withCacheFactory(LookupCacheFactory cacheFactory) {
this.cacheFactory = cacheFactory;
return this;
}
/**
* Enables storing missing key in the cache.
*
return *new <p>SeePartialCachingLookupProvider() {@link AsyncLookupFunctionProvider#cacheMissingKey()} for more details.
@Override
*/
public BuilderLookupCache enableCacheMissingKeygetCache() {
this.enableCacheMissingKey =return truecache;
}
return this;
}
@Override
public AsyncLookupFunctionProviderLookupFunction buildcreateLookupFunction() {
// Build AsyncLookupFunctionProvider
}return lookupFunction;
}
} |
FullCachingLookupProvider
This interface is for supporting full cache strategy. It reuses ScanRuntimeProvider and defines reload time.
Code Block |
---|
language | java |
---|
title | RescanRuntimeProvider |
---|
|
/**
* Runtime provider for fully loading and periodically}
reloading all entries of the lookup table and};
* storing the table}
locally for lookup.
/**
*Get <p>Implementationsa shouldnew provideinstance aof {@link ScanTableSource.ScanRuntimeProvider} in order to reuseLookupCache}. */
LookupCache getCache();
} |
PartialCachingAsyncLookupProvider
Code Block |
---|
language | java |
---|
title | PartialCachingAsyncLookupProvider |
---|
|
/**
* theProvider abilityfor ofcreating scanning{@link forAsyncLookupFunction} loadingand all{@link entriesLookupCache} fromfor thestoring lookup tableentries.
*/
@PublicEvolving
public interface FullCachingLookupProviderPartialCachingAsyncLookupProvider extends LookupTableSource.LookupRuntimeProviderAsyncLookupFunctionProvider {
/**
* Build Createsa {@link FullCachingLookupProviderPartialCachingLookupProvider} withfrom providedthe scanspecified runtime{@link providerAsyncLookupFunction} and reload
* {@link timeLookupCache}.
*/
static FullCachingLookupProviderPartialCachingLookupProvider of(
ScanTableSource.ScanRuntimeProvider scanRuntimeProvider, ReloadTime reloadTimeAsyncLookupFunction asyncLookupFunction, LookupCache cache) {
return new FullCachingLookupProviderPartialCachingAsyncLookupProvider() {
@Override
public ScanTableSource.ScanRuntimeProviderLookupCache getScanRuntimeProvidergetCache() {
return scanRuntimeProvidercache;
}
@Override
public ReloadTimeAsyncLookupFunction getReloadTimecreateAsyncLookupFunction() {
return reloadTimeasyncLookupFunction;
}
};
}
/**
Get a new instance * Gets theof {@link org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider}
* for executing the periodically reload.LookupCache}. */
*/
ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();LookupCache getCache();
} |
FullCachingLookupProvider
This interface is for supporting full cache strategy. It reuses ScanRuntimeProvider and defines reload time.
Code Block |
---|
language | java |
---|
title | FullCachingLookupProvider |
---|
|
/**
* A {@link CachingLookupProvider} that never lookup in external system on cache miss and provides a
* cache for holding all entries in the external system. The cache will be fully reloaded from the
* external system by the {@link ScanTableSource.ScanRuntimeProvider} and reload operations will be
* triggered by the {@link CacheReloadTrigger}.
*/
@PublicEvolving
public interface FullCachingLookupProvider extends LookupFunctionProvider {
static FullCachingLookupProvider of(
ScanTableSource.ScanRuntimeProvider scanRuntimeProvider,
CacheReloadTrigger cacheReloadTrigger) {
/** Gets the time when to reload cache. See {@link ReloadTime}. */
ReloadTime getReloadTime();
/** Defines at what time the cache should be reloaded. */
interface ReloadTime extends Serializable {
/** Gets the interval between two reload operations. */
Duration getReloadInterval();
/** Gets the start time of the reload operation in UTC. */
LocalTimereturn new getReloadStartTimeFullCachingLookupProvider(); {
/** Creates reload time@Override
with periodic intervals. */
staticpublic ReloadTimeScanTableSource.ScanRuntimeProvider withIntervalgetScanRuntimeProvider(Duration reloadInterval) {
return scanRuntimeProvider;
new ReloadTime() {
}
@Override
@Override
public DurationCacheReloadTrigger getReloadIntervalgetCacheReloadTrigger() {
return cacheReloadTrigger;
return reloadInterval;
}
}@Override
public LookupFunction createLookupFunction() @Override{
publicreturn LocalTime getReloadStartTime() {keyRow -> null;
}
return LocalTime.now(ZoneOffset.UTC)};
}
/**
* Get a {@link ScanTableSource.ScanRuntimeProvider}
for scanning all entries from the external
};
* lookup table and load into the }cache.
*/**
ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();
/** CreatesGet reloada time{@link withCacheReloadTrigger} periodicfor intervalstriggering afterthe initialreload delay up to {@codeoperation. */
CacheReloadTrigger getCacheReloadTrigger();
} |
CacheReloadTrigger
A trigger defining custom logic for triggering full cache reloading.
Code Block |
---|
language | java |
---|
title | CacheReloadTrigger |
---|
|
/** Customized trigger for * reloadStartTime}.
*/
static ReloadTime withIntervalAfterDelay(reloading all lookup table entries in full caching mode. */
@PublicEvolving
public interface CachingReloadTrigger extends AutoCloseable, Serializable {
/** Open the trigger. */
void open(Context context) Duration reloadInterval, LocalTime reloadStartTime) {throws Exception;
return new ReloadTime() {/**
* Context of {@link CacheReloadTrigger} for getting information about times @Overrideand
* triggering reload.
*/
public Duration getReloadInterval()interface Context {
/** Get current processing time. */
returnlong reloadIntervalcurrentProcessingTime();
/** Get current watermark on the main stream. }
*/
long currentWatermark();
@Override
/** Trigger a reload operation on the full cache. */
public LocalTimeCompletableFuture<Void> getReloadStartTimetriggerReload() {;
}
}
|
PeriodicCacheReloadTrigger
An implementation of FullCachingReloadTrigger that triggers reload with a specified interval.
Code Block |
---|
language | java |
---|
title | PeriodicCacheReloadTrigger |
---|
|
/** A trigger that reloads all entries periodically with specified interval or return reloadStartTime;
delay. */
public class PeriodicCacheReloadTrigger implements CacheReloadTrigger {
private final Duration reloadInterval;
}
private final ScheduleMode scheduleMode;
public PeriodicCacheReloadTrigger(Duration reloadInterval, ScheduleMode scheduleMode) };{
}
this.reloadInterval = reloadInterval;
/** Creates reloadthis.scheduleMode time= dailyscheduleMode;
at specified {@code reloadStartTime}. */
@Override
staticpublic ReloadTimevoid dailyAtSpecifiedTimeopen(LocalTimeCacheReloadTrigger.Context reloadStartTimecontext) {
// Register periodic reload returntask
new ReloadTime() {}
@Override
public void close() throws Exception {
@Override
// Dispose resources
}
public Durationenum getReloadInterval()ScheduleMode {
FIXED_DELAY,
FIXED_RATE
return Duration.ofDays(1);
}
@Override
}
} |
TimedCacheReloadTrigger
Code Block |
---|
language | java |
---|
title | TimedCacheReloadTrigger |
---|
|
/** A trigger that reloads at a specific local time and repeat for the given interval in days. */
public class TimedCacheReloadTrigger implements CacheReloadTrigger {
private final LocalTime reloadTime;
private final int reloadIntervalInDays;
public TimedCacheReloadTrigger(LocalTime getReloadStartTime(reloadTime, int reloadIntervalInDays) {
this.reloadTime = reloadTime;
this.reloadIntervalInDays = return reloadStartTimereloadIntervalInDays;
}
@Override
public void open(Context context) }{
// Register periodic reload task
}
};@Override
public void close() throws Exception {
// Dispose }resources
}
} |
TableFunctionProvider / AsyncTableFunctionProvider
...
In order to unify the usage of caching across all connectors, we'd like to introduce some common table options, which are defined under class LookupOptions
. Note that these options are not required to implement by all connectors.
Option | Type | Descriptions |
---|
lookup.cache | Enum of NONE, PARTIAL and FULL | The caching strategy for this lookup table. NONE: Do not use cache Partial: Use partial caching mode FULL: Use full caching mode |
lookup.async |
lookup.max-retries | Integer | The maximum allowed retries if a lookup operation fails |
lookup.partial-cache.expire-after-access | Duration | Duration to expire an entry in the cache after accessing |
lookup.partial-cache.expire-after-write | Duration | Duration to expire an entry in the cache after writing |
lookup.partial-cache.cache-missing-key | Boolean | Whether to |
use asynchronous mode for the lookup store an empty value into the cache if the lookup key doesn't match any rows in the table |
lookup.partial-cache.max- |
retriesInteger allowed retries if a lookup operation failsnumber of rows to store in the cache |
lookup. |
partialexpire-after-accessreload-strategy | Enum of PERIODIC and TIMED | The reload strategy for the full cache scenario. PERIODIC: Use PeriodicCacheReloadTrigger TIMED: Use TimedCacheReloadTrigger |
lookup.full-cache.periodic-reload.interval |
Duration | Duration to expire an entry in the cache after accessing | lookup.partial-cache.expire-after-write expire an entry cache after writingPeriodicCacheReloadTrigger |
lookup. |
partialcache-missing-keyBoolean | Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table | lookup.partial-cache.max-rows | Long | The maximum number of rows to store in the cacheperiodic-reload.schedule-mode | Enum of FIXED_DELAY and FIXED_RATE | The periodically schedule mode of reloading in the PeriodicCacheReloadTrigger |
lookup.full-cache.timed-reload.iso- |
intervalDuration | Interval of reloading all entries from the lookup table into cacheString | Time in ISO-8601 format when cache needs to be reloaded. Time can be specified either with timezone or without timezone (target JVM local timezone will be used). See formatter ISO_TIME. |
lookup.full-cache.timed-reload.interval- |
starttimeString start time of the cache reload operation in UTCinterval in days to trigger the reload at the specified time |
Cache Metrics
It is important to mention that a cache implementation does not have to report all the defined metrics. But if a cache reports a metric of the same semantic defined below, the implementation should follow the convention.
...