...
Code Block |
---|
language | java |
---|
title | LookupFunction |
---|
|
/**
* A wrapper class of {@link TableFunction} for synchronously lookup rows matching the lookup keys
* from external system.
*
* <p>The output type of this table function is fixed as {@link RowData}.
*/
@PublicEvolving
public abstract class LookupFunction extends TableFunction<RowData> {
/**
* Synchronously lookup rows matching the lookup keys.
*
* @param keyskeyRow - Keys A {@link RowData} that wraps keys to lookup.
* @return A collections of all matching rows in the lookup table.
*/
public abstract Collection<RowData> lookup(Object... keysRowData keyRow) throws IOException;
/** Invoke {@link #lookup} and handle exceptions. */
public final void eval(Object... keys) {
try {
lookup(keys).forEach(this::collect);
} catch (IOException e) {
throw new RuntimeException("Failed to lookup values with given key", e);
}
}
} |
...
Code Block |
---|
language | java |
---|
title | AsyncLookupFunction |
---|
|
/**
* A wrapper class of {@link AsyncTableFunction} for asynchronously lookup rows matching the lookup
* keys from external system.
*
* <p>The output type of this table function is fixed as {@link RowData}.
*/
@PublicEvolving
public abstract class AsyncLookupFunction extends AsyncTableFunction<RowData> {
/**
* Asynchronously lookup rows matching the lookup keys.
*
* @param keyskeyRow - KeysA {@link RowData} that wraps keys to lookup.
* @return A collections of all matching rows in the lookup table.
*/
public abstract CompletableFuture<Collection<RowData>> asyncLookup(Object... keysRowData keyRow);
/** Invokes {@link #asyncLookup} and chains futures. */
public final void eval(CompletableFuture<Collection<RowData>> future, Object... keys) {
asyncLookup(keys)
.whenCompleteAsync(
(result, exception) -> {
if (exception != null) {
future.completeExceptionally(exception);
return;
}
future.complete(result);
});
}
} |
...
Code Block |
---|
language | java |
---|
title | LookupCache |
---|
|
/**
* A semi-persistent mapping from keys to values for storing entries of lookup table.
*
* <p>The type of the caching key is a {@link RowData} with lookup key fields packed inside. The
* type of value is a {@link Collection} of {@link RowData}, which are rows matching lookup key
* fields.
*
* <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either
* evicted or manually invalidated.
*
* <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
* by multiple concurrent threads.
*/
@PublicEvolving
public interface LookupCache {
/**
* Returns the value associated with key in this cache, or null if there is no cached value for
* key.
*/
@Nullable
Collection<RowData> getIfPresent(RowData key);
/**
* Associates the specified value rows with the specified key row in the cache. If the cache
* previously contained value associated with the key, the old value is replaced by the
* specified value.
*
* @return the previous value rows associated with key, or null if there was no mapping for key.
* @param key - key row with which the specified value is to be associated
* @param value – value rows to be associated with the specified key
*/
Collection<RowData> put(RowData key, Collection<RowData> value);
/**
Discards any cached value *for Copiesthe allspecified the mappings from the specified map to the cache. The effect of this call is
* equivalent to that of calling {@code put(k, v)} on this map once for each mapping from key
* {@code k} to value {@code v} in the specified map. The behavior of this operation is
* undefined if the specified map is modified while the operation is in progress.
*/
void putAll(Map<? extends RowData, ? extends Collection<RowData>> m);key. */
void invalidate(RowData key);
/** Returns the number of key-value mappings in the cache. */
long size();
} |
LookupCacheFactory
As the cache should be instantiated during runtime execution to avoid serialization / deserialization, a factory is required for creating the cache.
Code Block |
---|
language | java |
---|
title | LookupCacheFactory |
---|
|
/** Factory for creating an instance of {@link LookupCache}. */
@PublicEvolving
public interface LookupCacheFactory extends Serializable {
/**
Discards any cached value* forCreate thea specified{@link keyLookupCache}. */
void invalidate(RowData key);
*
/** Discards@param allmetricGroup entries- inThe thelookup cache. */
void invalidateAll();
metric group in which the cache register predefined and
/** Returns the number of key-value mappings in the cache.custom metrics.
*/
longLookupCache sizecreateCache(LookupCacheMetricGroup metricGroup);
} |
LookupCacheFactory
DefaultLookupCacheFactory
In order to simplify the usage of developer, we provide a default factory for building a default As the cache should be instantiated during runtime execution to avoid serialization / deserialization, a factory is required for creating the cache.
Code Block |
---|
language | java |
---|
title | LookupCacheFactoryDefaultLookupCacheFactory |
---|
|
/** Factory for creating an instance of {@link LookupCacheDefaultLookupCache}. */
@PublicEvolving
public interfaceclass LookupCacheFactoryDefaultLookupCacheFactory extendsimplements SerializableLookupCacheFactory {
/**
private final * Create a {@link LookupCache}.
*Duration expireAfterAccessDuration;
private final Duration expireAfterWriteDuration;
private *final @param metricGroup - The lookup cache metric group in which the cache register predefined and
* custom metrics.Integer initialCapacity;
private final Long maximumSize;
public static DefaultLookupCacheFactory.Builder newBuilder() {
*/
return LookupCachenew createCacheBuilder(LookupCacheMetricGroup metricGroup);
} |
DefaultLookupCacheFactory
In order to simplify the usage of developer, we provide a default factory for building a default cache.
Code Block |
---|
language | java |
---|
title | DefaultLookupCacheFactory |
---|
|
/** Factory for creating instance of {@link DefaultLookupCache}. */
@PublicEvolving
public classprivate DefaultLookupCacheFactory(
implements LookupCacheFactory {
private final Duration expireAfterAccessDuration;,
private final Duration expireAfterWriteDuration;,
private final Integer initialCapacity;,
private final Long maximumSize;
) {
// Validation
public static DefaultLookupCacheFactorythis.BuilderexpireAfterAccessDuration newBuilder() {= expireAfterAccessDuration;
returnthis.expireAfterWriteDuration new= Builder()expireAfterWriteDuration;
}
this.initialCapacity private= DefaultLookupCacheFactory(initialCapacity;
this.maximumSize = maximumSize;
Duration expireAfterAccessDuration,}
@Override
public LookupCache createCache(LookupCacheMetricGroup Duration expireAfterWriteDuration,metricGroup) {
// Create instance of Integer initialCapacity,DefaultLookupCache
}
/** Builder of {@link Long maximumSize) {
DefaultLookupCacheFactory}. */
public static class //Builder Validation{
this.expireAfterAccessDurationprivate =Duration expireAfterAccessDuration;
this.expireAfterWriteDurationprivate =Duration expireAfterWriteDuration;
this.initialCapacityprivate =Integer initialCapacity;
this.maximumSizeprivate =Long maximumSize;
}
@Override
public LookupCacheDefaultLookupCacheFactory.Builder createCacheexpireAfterAccess(LookupCacheMetricGroupDuration metricGroupduration) {
// Create instanceexpireAfterAccessDuration of= DefaultLookupCacheduration;
}
/** Builder of {@link DefaultLookupCacheFactory}. */return this;
public static class Builder {}
public privateDefaultLookupCacheFactory.Builder expireAfterWrite(Duration expireAfterAccessDuration;duration) {
private Duration expireAfterWriteDuration = duration;
private Integerreturn initialCapacitythis;
private Long maximumSize;}
public DefaultLookupCacheFactory.Builder expireAfterAccessinitialCapacity(Durationint durationinitialCapacity) {
expireAfterAccessDurationthis.initialCapacity = durationinitialCapacity;
return this;
}
public DefaultLookupCacheFactory.Builder expireAfterWritemaximumSize(Durationlong durationmaximumSize) {
expireAfterWriteDurationthis.maximumSize = durationmaximumSize;
return this;
}
public DefaultLookupCacheFactory.Builder initialCapacitybuild(int initialCapacity) {
this.initialCapacity = initialCapacity;
return new DefaultLookupCacheFactory(
return this;expireAfterAccessDuration,
}
public DefaultLookupCacheFactory.Builder maximumSize(long maximumSize) {expireAfterWriteDuration,
this.maximumSize = maximumSize;
initialCapacity,
return this;
}
maximumSize);
public DefaultLookupCacheFactory build() {}
return new DefaultLookupCacheFactory(
expireAfterAccessDuration,
expireAfterWriteDuration,
initialCapacity,
maximumSize);
}
}
} |
LookupCacheMetricGroup
An interface defining all cache related metric:
LookupCacheMetricGroup
An interface defining all cache related metric:
Code Block |
---|
language | java |
---|
title | CacheMetricGroup |
---|
|
/**
* Pre-defined metrics for {@code LookupCache}.
*
* <p>Please note that these methods should only be invoked once. Registering a metric with same
* name for multiple times would lead to an undefined behavior.
*/
@PublicEvolving
public interface LookupCacheMetricGroup extends MetricGroup {
/** The number of cache hits. */
void hitCounter(Counter hitCounter);
/** The number of cache misses. */
void missCounter(Counter missCounter);
/** The number of times to load data into cache from external system. */
void loadCounter(Counter loadCounter);
/** The number of load failures. */
void numLoadFailuresCounter(Counter numLoadFailuresCounter);
|
Code Block |
---|
language | java |
---|
title | CacheMetricGroup |
---|
|
/** Pre-defined metrics for {@code LookupCache}. */
@PublicEvolving
public interface LookupCacheMetricGroup extends MetricGroup {
/** The number of cache hits time spent for the latest load operation. */
void setHitCounterlatestLoadTimeGauge(CounterGauge<Long> hitCounterlatestLoadTimeGauge);
/** The number of records in cache misses. */
void setMissCounternumCachedRecordsGauge(CounterGauge<Long> missCounternumCachedRecordsGauge);
/** The number of times to load data into cache from external systembytes used by cache. */
void setLoadCounternumCachedBytesGauge(CounterGauge<Long> loadCounternumCachedBytesGauge);
/** The number of load failures. */
void setNumLoadFailuresCounter(Counter numLoadFailuresCounter);
/** The time spent} |
LookupFunctionProvider
This is the API between table framework and user's table source. Implementation should define how to create a lookup function and whether to use cache.
Code Block |
---|
language | java |
---|
title | LookupFunctionProvider |
---|
|
/**
* Provider for creating {@link LookupFunction} and {@link LookupCacheFactory} if caching should be
* enabled for the latestlookup load operationtable.
*/
@PublicEvolving
public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider void{
setLatestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);
/** Creates Thea numberbuilder of records in cache. */
void setNumCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);
{@link LookupFunctionProvider}. */
/**static TheBuilder number of bytes used by cache. */
newBuilder() {
return voidnew setNumCachedBytesGaugeBuilder(Gauge<Long> numCachedBytesGauge);
} |
LookupFunctionProvider
This is the API between table framework and user's table source. Implementation should define how to create a lookup function and whether to use cache.
Code Block |
---|
language | java |
---|
title | LookupFunctionProvider |
---|
|
/**
* Provider for creating }
/** Creates an {@link LookupFunction} and {@link LookupCacheFactory} if caching should be
* enabled for the lookup table.
*/
@PublicEvolving
public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {
/**
* Creates {@link LookupFunctionProvider} with the given {@link LookupFunction} and disable
* lookup table caching instance. */
LookupFunction createLookupFunction();
/**
* Gets the {@link LookupCacheFactory} for creating lookup cache.
*
* <p>This factory will be used for creating an instance of cache during runtime execution for
* optimizing the access to external lookup table.
*/
static LookupFunctionProvider* of(LookupFunction lookupFunction)@return an {
@link Optional} of {@link LookupCacheFactory}, or an empty return{@link new LookupFunctionProvider() {
Optional} if
* caching shouldn't be @Override
applies to the lookup table.
*/
public LookupFunctionOptional<LookupCacheFactory> createLookupFunctiongetCacheFactory() {;
/**
* Whether the missing key (key fields returnwithout lookupFunction;
any matching value rows) should be stored in }the
* cache.
@Override*
* <p>Please note that this option is required publicif Optional<LookupCacheFactory>{@link getCacheFactory#getCacheFactory() {} returns a non-empty
* instance. If the cache factory is empty, the return value return Optional.empty();
of this function will be ignored.
*
}
* @return true if a null or empty value should @Override
be stored in the cache.
*/
public Optional<Boolean> cacheMissingKey();
/** Builder class for {@link LookupFunctionProvider}. */
class Builder {
private return Optional.empty()LookupFunction lookupFunction;
private LookupCacheFactory cacheFactory;
}
private Boolean }enableCacheMissingKey;
}
/**
Sets lookup function. */
* Creates {@link LookupFunctionProvider} with the givenpublic {@linkBuilder withLookupFunction(LookupFunction} andlookupFunction) enable
* caching with specified {@link LookupCacheFactory}.
{
*/
static LookupFunctionProvider of(this.lookupFunction = lookupFunction;
LookupFunctionreturn lookupFunction,this;
}
LookupCacheFactory cacheFactory,
/** Enables caching and sets the cache boolean cacheMissingKey) {factory. */
returnpublic newBuilder LookupFunctionProviderwithCacheFactory(LookupCacheFactory cacheFactory) {
@Override
this.cacheFactory = cacheFactory;
public LookupFunction createLookupFunction() {return this;
}
return lookupFunction;
/**
* Enables storing missing }
key in the cache.
@Override*
* <p>See public Optional<LookupCacheFactory> getCacheFactory{@link LookupFunctionProvider#cacheMissingKey()} {
for more details.
return Optional.of(cacheFactory);*/
public Builder enableCacheMissingKey() }{
@Override
this.enableCacheMissingKey = true;
public Optional<Boolean> cacheMissingKey() {return this;
}
return Optional.of(cacheMissingKey);
public LookupFunctionProvider build() {
// Build }LookupFunctionProvider
};
}
} |
AsyncLookupFunctionProvider
Code Block |
---|
language | java |
---|
title | AsyncLookupFunctionProvider |
---|
|
@PublicEvolving
public interface AsyncLookupFunctionProvider /** Creates an {@link LookupFunction} instance. */
LookupFunction createLookupFunction();
extends LookupTableSource.LookupRuntimeProvider {
/**
* Gets Creates {@link AsyncLookupFunctionProvider} with the given {@link LookupCacheFactoryAsyncLookupFunction} for creatingand
* disable lookup cachetable caching.
*/
static *AsyncLookupFunctionProvider <p>This factory will be used for creating an instance of cache during runtime execution forof(AsyncLookupFunction asyncLookupFunction) {
return new AsyncLookupFunctionProvider() {
* optimizing the access to external lookup table.@Override
*
* @return anpublic {@link Optional} of {@link LookupCacheFactory}, or an empty {@link Optional} ifAsyncLookupFunction createAsyncLookupFunction() {
* caching shouldn't be applies to the lookupreturn table.asyncLookupFunction;
*/
Optional<LookupCacheFactory> getCacheFactory();}
/**
* Whether@Override
the missing key (key fields without any matching value rows) should bepublic storedOptional<LookupCacheFactory> ingetCacheFactory() the{
* cache.
*
* <p>Please note that this option is required if {@link #getCacheFactory()} returns a non-emptyreturn Optional.empty();
}
* instance. If the cache factory is@Override
empty, the return value of this function will be ignored.
public Optional<Boolean> cacheMissingKey() *{
* @return true if a null or empty value should be stored in thereturn cacheOptional.empty();
*/
Optional<Boolean> cacheMissingKey();
} |
AsyncLookupFunctionProvider
Code Block |
---|
language | java |
---|
title | AsyncLookupFunctionProvider |
---|
|
@PublicEvolving
public interface AsyncLookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider { }
};
}
/**
* Creates {@link AsyncLookupFunctionProvider} with the given {@link AsyncLookupFunction} and
* enable caching disablewith lookupspecified table{@link cachingLookupCacheFactory}.
*/
static AsyncLookupFunctionProvider of(
AsyncLookupFunction asyncLookupFunction asyncLookupFunction,
LookupCacheFactory cacheBuilder,
boolean cacheMissingKey) {
return new AsyncLookupFunctionProvider() {
@Override
public AsyncLookupFunction createAsyncLookupFunction() {
return asyncLookupFunction;
}
@Override
public Optional<LookupCacheFactory> getCacheFactory() {
return Optional.emptyof(cacheBuilder);
}
@Override
public Optional<Boolean> cacheMissingKey() {
return Optional.emptyof(cacheMissingKey);
}
};
}
/**
Creates an {@link AsyncLookupFunction} instance. */
Creates {@link AsyncLookupFunctionProvider} with the given {@link AsyncLookupFunction} andAsyncLookupFunction createAsyncLookupFunction();
/**
* enableGets cachingthe with specified {@link LookupCacheFactory} for creating lookup cache.
*/
static * AsyncLookupFunctionProvider<p>This of(
factory will be used for creating an instance of cache during runtime AsyncLookupFunctionexecution asyncLookupFunction,for
* optimizing the access to external lookup LookupCacheFactory cacheBuilder,
table.
*
* @return an {@link booleanOptional} cacheMissingKey)of {
@link LookupCacheFactory}, or an empty {@link Optional} if
return new AsyncLookupFunctionProvider() {
* caching shouldn't be applies to the lookup @Overridetable.
*/
Optional<LookupCacheFactory> getCacheFactory();
public AsyncLookupFunction createAsyncLookupFunction() {
return asyncLookupFunction; /**
* Whether the missing key (key fields without any matching value rows) should be stored in the
* cache.
*
}
* <p>Please note that this option is required @Override
if {@link #getCacheFactory()} returns a non-empty
* instance. publicIf Optional<LookupCacheFactory>the getCacheFactory() {
cache factory is empty, the return value of this function will be ignored.
return Optional.of(cacheBuilder);*
* @return true if a null or }
empty value should be stored in the cache.
@Override*/
Optional<Boolean> cacheMissingKey();
/** publicBuilder Optional<Boolean>class cacheMissingKey()for {@link AsyncLookupFunctionProvider}. */
class Builder {
private return Optional.of(cacheMissingKey)AsyncLookupFunction asyncLookupFunction;
private LookupCacheFactory cacheFactory;
}
private Boolean }enableCacheMissingKey;
}
/** CreatesSets an {@link AsyncLookupFunction} instancelookup function. */
AsyncLookupFunction createAsyncLookupFunction();
public /**
Builder withAsyncLookupFunction(AsyncLookupFunction asyncLookupFunction) {
* Gets the {@link LookupCacheFactory} for creatingthis.asyncLookupFunction lookup= cache.asyncLookupFunction;
*
* <p>This factory willreturn bethis;
used for creating an instance of cache during}
runtime execution for
/** optimizingEnables thecaching accessand tosets externalthe lookupcache tablefactory. */
*
public Builder * @return an {@link Optional} of {@link LookupCacheFactory}, or an empty {@link Optional} if
*withCacheFactory(LookupCacheFactory cacheFactory) {
this.cacheFactory = cacheFactory;
caching shouldn't be applies to the lookupreturn table.
this;
*/}
Optional<LookupCacheFactory> getCacheFactory();
/**
* WhetherEnables thestoring missing key (keyin fieldsthe withoutcache.
any matching value rows) should be stored in the*
* cache.
<p>See {@link AsyncLookupFunctionProvider#cacheMissingKey()} for more details.
*/
public * <p>Please note that this option is required if {@link #getCacheFactory()} returns a non-emptyBuilder enableCacheMissingKey() {
this.enableCacheMissingKey = true;
* instance. If the cache factory is empty, the return value of this function will be ignored.return this;
}
*
public * @return true if a null or empty value should be stored in the cache.AsyncLookupFunctionProvider build() {
// Build AsyncLookupFunctionProvider
*/
}
Optional<Boolean> cacheMissingKey();}
} |
RescanRuntimeProvider
This interface is for supporting all cache strategy. It reuses ScanRuntimeProvider and defines interval of re-scan.
Code Block |
---|
language | java |
---|
title | RescanRuntimeProvider |
---|
|
/**
* Runtime provider for periodically re-scanning all entries of the lookup table and storing the
* table locally for lookup.
*/
@PublicEvolving
public interface RescanRuntimeProvider extends LookupTableSource.LookupRuntimeProvider {
/**
* Gets the {@link org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider}
* for executing the periodically re-scan.
*/
ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();
/** Gets the interval between two re-scans. */
Duration getRescanInterval();
} |
TableFunctionProvider / AsyncTableFunctionProvider
We'd like to deprecate these two interfaces and let developers to switch to the new LookupFunctionProvider / AsyncLookupFunctionProvider instead.
Cache Metrics
It is important to mention that a cache implementation does not have to report all the defined metrics. But if a cache reports a metric of the same semantic defined below, the implementation should follow the convention.
...
The metric group for the cache would be a sub-group of the OperatorMetricGroup where the table function belongs to.
Future Works
In order to reduce network I/O with external systems and the usage of cache further, some optimizations implemented on scan source could be also applied on the lookup table, such as projection and filter pushdown. These features will be introduced separately in another FLIP.
Compatibility, Deprecation, and Migration Plan
...