...
LookupFunction
/AsyncLookupFunction
, an extended version of TableFunction to clarify the semantic of lookup.LookupCache
/LookupCacheFactory
, defining the cache and its factory used in lookup table.DefaultLookupCacheFactory
, aDefaultLookupCache
a default implementation of a cache that suitable for most use cases.LookupCacheMetricGroup
CacheMetricGroup
, defining metrics should be reported by the lookup cache.LookupFunctionProvider
PartialCachingLookupProvider
/AsyncLookupFunctionProvider
AsyncPartialCachingLookupProvider
, as the API interacting with table source to get LookupFunction and LookupCacheFactory.
...
If the size of lookup table is relatively small to fit into the memory, and the lookup table doesn't change frequently, it'll be more efficient to load all entries of the lookup table into the cache to reduce network I/O, and refresh the table periodically. We'd like to name this use case as "full cache". Logically the reload operation is a kind of scan, so we'd like to reuse the ScanRuntimeProvider so that developers could reuse the scanning logic implemented in Source / SourceFunction / InputFormat. Considering the complexity of Source API, we'd like to support SourceFunction and InputFormat API first. Supporting Source API might require new topology and will be discussed later in another FLIP.
We propose to introduce a several new interface FullCachingLookupProvider
in order to reuse interfaces:
FullCachingLookupProvider
, for reusing the ability of scanning.
...
FullCachingReloadTrigger
, for customizing reloading strategies of all entries in the full cache.
Public Interfaces
Lookup Functions
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * A wrapper class of {@link AsyncTableFunction} for asynchronously lookup rows matching the lookup * keys from external system. * * <p>The output type of this table function is fixed as {@link RowData}. */ @PublicEvolving public abstract class AsyncLookupFunction extends AsyncTableFunction<RowData> { /** * Asynchronously lookup rows matching the lookup keys. * * @param keyRow - A {@link RowData} that wraps keys to lookup. * @return A collections of all matching rows in the lookup table. */ public abstract CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow); /** Invokes {@link #asyncLookup} and chains futures. */ public final void eval(CompletableFuture<Collection<RowData>> future, Object... keys) { asyncLookup(GenericRowData.of(keys)) .whenCompleteAsync( (result, exception) -> { if (exception != null) { future.completeExceptionally(exception); return; } future.complete(result); }); } } |
...
LookupCache
Considering there might be custom caching strategies and optimizations, we'd like to expose the cache interface as public API for developers to make the cache pluggable.
Code Block | ||||
---|---|---|---|---|
| ||||
/** * A semi-persistent mapping from keys to values for storing entries of lookup table. * * <p>The type of the caching key is a {@link RowData} with lookup key fields packed inside. The * type of value is a {@link Collection} of {@link RowData}, which are rows matching lookup key * fields. * * <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either * evicted or manually invalidated. * * <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed * by multiple concurrent threads. */ @PublicEvolving public interface PartialCacheLookupCache extends AutoClosable { void open(LookupCacheMetricGroup metricGroup); /** * Initialize the cache. * * @param metricGroup the metric group to register cache related metrics. */ void open(CacheMetricGroup metricGroup); /** * Returns the value associated with key in this cache, or null if there is no cached value for * key. */ @Nullable Collection<RowData> getIfPresent(RowData key); /** * Associates the specified value rows with the specified key row in the cache. If the cache * previously contained value associated with the key, the old value is replaced by the * specified value. * * @return the previous value rows associated with key, or null if there was no mapping for key. * @param key - key row with which the specified value is to be associated * @param value – value rows to be associated with the specified key */ Collection<RowData> put(RowData key, Collection<RowData> value); /** Discards any cached value for the specified key. */ void invalidate(RowData key); /** Returns the number of key-value mappings in the cache. */ long size(); void close(); } |
...
DefaultLookupCache
As the cache should be instantiated during runtime execution to avoid serialization / deserialization, a factory is required for creating the cache.
Code Block | ||||
---|---|---|---|---|
| ||||
/** Factory for creating an instanceDefault implementation of {@link LookupCache}. */ @PublicEvolving public class DefaultPartialCacheDefaultLookupCache implements PartialCacheLookupCache { private final Duration expireAfterAccessDuration; private final Duration expireAfterWriteDuration; private final Long maximumSize; private DefaultPartialCacheDefaultLookupCache( Duration expireAfterAccessDuration, Duration expireAfterWriteDuration, Long maximumSize) { this.expireAfterAccessDuration = expireAfterAccessDuration; this.expireAfterWriteDuration = expireAfterWriteDuration; this.initialCapacity = initialCapacity; this.maximumSize = maximumSize; } // overide public static Builder newBuilder() { return new Builder(); } public static class Builder { private Duration expireAfterAccessDuration; private Duration expireAfterWriteDuration; private Long maximumSize; public DefaultPartialCacheDefaultLookupCache.Builder expireAfterAccess(Duration duration) { expireAfterAccessDuration = duration; return this; } public DefaultPartialCacheDefaultLookupCache.Builder expireAfterWrite(Duration duration) { expireAfterWriteDuration = duration; return this; } public DefaultPartialCacheDefaultLookupCache.Builder maximumSize(long maximumSize) { this.maximumSize = maximumSize; return this; } public DefaultPartialCacheDefaultLookupCache build() { return new DefaultPartialCacheDefaultLookupCache( expireAfterAccessDuration, expireAfterWriteDuration, maximumSize); } } } |
...
CacheMetricGroup
An interface defining all cache related metric:
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Pre-defined metrics for {@code LookupCache}cache. * * <p>Please note that these methods should only be invoked once. Registering a metric with same * name for multiple times would lead to an undefined behavior. */ @PublicEvolving public interface LookupCacheMetricGroupCacheMetricGroup extends MetricGroup { /** The number of cache hits. */ void hitCounter(Counter hitCounter); /** The number of cache misses. */ void missCounter(Counter missCounter); /** The number of times to load data into cache from external system. */ void loadCounter(Counter loadCounter); /** The number of load failures. */ void numLoadFailuresCounter(Counter numLoadFailuresCounter); /** The time spent for the latest load operation. */ void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge); /** The number of records in cache. */ void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge); /** The number of bytes used by cache. */ void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge); } |
...
PartialCachingLookupProvider
This is the API between table framework and user's table source. Implementation should define how to create a lookup function and whether to use cache.
Code Block | ||||
---|---|---|---|---|
| ||||
//** * Provider for creating {@link LookupFunction} and {@link LookupCacheFactoryLookupCache} if caching should be enabled * enabled for the lookup table. */ @PublicEvolving public interface LookupFunctionProviderPartialCachingLookupProvider extends LookupTableSource.LookupRuntimeProvider { /** Creates a builder of {@link LookupFunctionProviderPartialCachingLookupProvider}. */ static Builder newBuilder() { return new Builder(); } /** Creates an {@link LookupFunction} instance. */ LookupFunction createLookupFunction(); /** * Gets the instance of {@link LookupCacheFactory} for creating lookup cacheLookupCache}. * * <p>This cache will be used initialized by {@link LookupCache#open} during runtime execution forand * optimizingused for optimizing the access to external lookup table. * * @return an {@link Optional} of {@link LookupCache}, or an empty {@link Optional} if caching * caching shouldn't be applies to the lookup table. */ Optional<PartialCache>Optional<LookupCache> getCache(); /** * Whether the missing key (key fields without any matching value rows) should be stored in the * cache. * * <p>Please note that this option is required if {@link #getCacheFactory()#getCache} returns a non-empty * instance. If the cache factory is empty, the return value of this function will be ignored. * * @return true if a null or empty value should be stored in the cache. */ Optional<Boolean> cacheMissingKey(); /** Builder class for {@link LookupFunctionProviderPartialCachingLookupProvider}. */ class Builder { private LookupFunction lookupFunction; private PartialCacheLookupCache partialCachecache; private Boolean enableCacheMissingKeycacheMissingKey; /** Sets lookup function. */ public Builder withLookupFunction(LookupFunction lookupFunction) { this.lookupFunction = lookupFunction; return this; } /** Enables caching and sets the cache factorEnables storing missing key in the cachefactory. */ public Builder withCache(PartialCacheLookupCache partialCache cache, Booleanboolean enableCacheMissingKeycacheMissingKey) { this.partialCachecache = partialCachecache; this.enableCacheMissingKey this.cacheMissingKey = truecacheMissingKey; return this; return this; } } public LookupFunctionProviderPartialCachingLookupProvider build() { // Build LookupFunctionProvider... } } } |
...
AsyncPartialCachingLookupProvider
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Provider for creating {@link AsyncLookupFunction} and {@link LookupCache} if caching should be * enabled for the lookup table. */ @PublicEvolving public interface AsyncLookupFunctionProviderAsyncPartialCachingLookupProvider extends LookupTableSource.LookupRuntimeProvider { /** Creates a builder of {@link LookupFunctionProviderAsyncPartialCachingLookupProvider}. */ static AsyncPartialCachingLookupProvider.Builder newBuilder() { return new AsyncPartialCachingLookupProvider.Builder(); } /** Creates an {@link AsyncLookupFunction} instance. */ AsyncLookupFunction createAsyncLookupFunction(); /** * Gets the {@link LookupCacheFactoryLookupCache} for creating lookup cache. * * <p>This factory will be used for creating an instance of cache during runtime execution for * optimizing the access to external lookup table. * * @return an {@link Optional} of {@link LookupCacheFactoryLookupCache}, or an empty {@link Optional} if caching * caching shouldn't be applies to the lookup table. */ Optional<LookupCacheFactory>Optional<LookupCache> getCacheFactorygetCache(); /** * Whether the missing key (key fields without any matching value rows) should be stored in the * cache. * * <p>Please note that this option is required if {@link #getCacheFactory()#getCache} returns a non-empty * instance. If the cache factory is empty, the return value of this function will be ignored. * * @return true if a null or empty value should be stored in the cache. */ Optional<Boolean> cacheMissingKey(); /** Builder class for {@link AsyncLookupFunctionProviderAsyncPartialCachingLookupProvider}. */ class Builder { private AsyncLookupFunction asyncLookupFunction; private LookupCacheFactoryLookupCache cacheFactorycache; private Boolean enableCacheMissingKeycacheMissingKey; /** Sets lookup function. */ public AsyncPartialCachingLookupProvider.Builder withAsyncLookupFunction(withLookupFunction( AsyncLookupFunction asyncLookupFunction) { this.asyncLookupFunction = asyncLookupFunction; return this; } /** Enables caching and sets the cache factory. */ public AsyncPartialCachingLookupProvider.Builder withCacheFactory(LookupCacheFactory cacheFactorywithCache( LookupCache cache, boolean cacheMissingKey) { this.cacheFactorycache = cacheFactorycache; return thisthis.cacheMissingKey = cacheMissingKey; } return this; /** } * Enables storing missing keypublic inAsyncPartialCachingLookupProvider the cache.build() { * ... } * <p>See {@link AsyncLookupFunctionProvider#cacheMissingKey()} for more details. */ public Builder enableCacheMissingKey() { this.enableCacheMissingKey = true; return this; } } } |
FullCachingLookupProvider
This interface is for supporting full cache strategy. It reuses ScanRuntimeProvider and defines reload time.
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Runtime provider for fully loading and periodically reloading all entries of the lookup table and * storing the table locally for lookup. * * <p>Implementations should provide a {@link ScanTableSource.ScanRuntimeProvider} in order to reuse * the ability of scanning for loading all entries from the lookup table. */ @PublicEvolving public interface FullCachingLookupProvider extends LookupTableSource.LookupRuntimeProvider { static public AsyncLookupFunctionProvider build() {FullCachingLookupProvider of( // Build AsyncLookupFunctionProvider ScanTableSource.ScanRuntimeProvider scanRuntimeProvider, } FullCachingReloadTrigger } } |
FullCachingLookupProvider
This interface is for supporting full cache strategy. It reuses ScanRuntimeProvider and defines reload time.
fullCachingReloadTrigger) {
return new FullCachingLookupProvider() {
@Override
public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider() {
return scanRuntimeProvider;
}
@Override
public FullCachingReloadTrigger getReloadTrigger() {
return fullCachingReloadTrigger;
}
};
}
/**
* Gets the {@link ScanTableSource.ScanRuntimeProvider} for executing the periodically reload.
*/
ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();
/** Get the {@link FullCachingReloadTrigger} for triggering a full caching reload operation. */
FullCachingReloadTrigger getReloadTrigger();
}
|
FullCachingReloadTrigger
A trigger defining custom logic for triggering full cache reloading.
Code Block | ||||
---|---|---|---|---|
| ||||
/** Customized trigger for reloading all lookup table entries in full caching mode. */
public interface FullCachingReloadTrigger extends AutoCloseable, Serializable {
/** Open the trigger. */
void open(Context context) throws Exception;
| ||||
Code Block | ||||
| ||||
/** * Runtime provider for fully loading and periodically reloading all entries of the lookup table and * storing the table locally for lookup. * * <p>Implementations should provide a {@link ScanTableSource.ScanRuntimeProvider} in order to reuse * the ability of scanning for loading all entries from the lookup table. */ @PublicEvolving public interface FullCachingLookupProvider extends LookupTableSource.LookupRuntimeProvider { /** * Context Createsof {@link FullCachingLookupProviderFullCachingReloadTrigger} withfor providedgetting scaninformation runtimeabout providertimes and reload * timetriggering reload. */ staticinterface FullCachingLookupProviderContext of({ /** Get current processing ScanTableSourcetime.ScanRuntimeProvider scanRuntimeProvider,*/ ReloadStrategy reloadStrategy) { return new FullCachingLookupProvider() {long currentProcessingTime(); /** Get current watermark @Override on the main stream. */ public ScanTableSource.ScanRuntimeProviderlong getScanRuntimeProvidercurrentWatermark() {; /** Trigger a reload operation on the full return scanRuntimeProvider;cache. */ CompletableFuture<Void> triggerReload(); } } |
PeriodicFullCachingReloadTrigger
An implementation of FullCachingReloadTrigger that triggers reload with a specified interval.
Code Block |
---|
/** A trigger that reloads all entries periodically with specified interval or @Override delay. */ public class PeriodicFullCachingReloadTrigger implements FullCachingReloadTrigger { private final Duration reloadInterval; publicprivate ReloadStrategyfinal getReloadStrategy() {ScheduleMode scheduleMode; private ScheduledExecutorService scheduledExecutor; public PeriodicFullCachingReloadTrigger(Duration reloadInterval, ScheduleMode return reloadStrategy;scheduleMode) { this.reloadInterval = reloadInterval; } this.scheduleMode = }scheduleMode; } /**@Override public * Gets the {@link org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider}void open(FullCachingReloadTrigger.Context context) { * for executing thescheduledExecutor periodically= reloadExecutors.newSingleThreadScheduledExecutor(); */ switch ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(); scheduleMode) { /** case FIXED_RATE: * Gets the strategy when to reload cachescheduledExecutor. See {@link ReloadStrategy} and default implementation scheduleAtFixedRate( * {@link FixedDelayReloadStrategy}. */ context::triggerReload, ReloadStrategy getReloadStrategy(); /** Defines strategy when to reload cache. */ interface ReloadStrategy extends Serializable0, { /** * Schedules task that reloads cachereloadInterval.toMillis(), This method is called just once, so implementation * must create the separate thread that will run {@code reloadTask} by its specific TimeUnit.MILLISECONDS); * triggers.break; */ void scheduleReload(Runnable reloadTask); case FIXED_DELAY: /** Stops task that reloads cachescheduledExecutor. */ scheduleWithFixedDelay( void stopReloading(); } } |
FixedDelayReloadStrategy
This class is default implementation of FullCachingLookupProvider.ReloadStrategy that schedules reloads with fixed delay, using ScheduledExecutorService#scheduleWithFixedDelay.
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Default {@link FullCachingLookupProvider.ReloadStrategy} that schedules reloads with() fixed-> delay.{ */ public class FixedDelayReloadStrategy implements FullCachingLookupProvider.ReloadStrategy { private final Duration reloadInterval; @Nullable private final LocalTime reloadStartTime; private ScheduledExecutorService executorService; try { private FixedDelayReloadStrategy(Duration reloadInterval, @Nullable LocalTime reloadStartTime) { this.reloadInterval = reloadInterval; this.reloadStartTime = reloadStartTime; } context.triggerReload().get(); @Override public void scheduleReload(Runnable reloadTask) { long delay = reloadInterval.toMillis(); } catch (Exception longe) initialDelay{ = 0; if (reloadStartTime != null) { LocalTime now = LocalTime.now(ZoneOffset.UTC); throw new RuntimeException( Duration initialDelayDuration = Duration.between(now, reloadStartTime); if (initialDelayDuration.isNegative()) { "Uncaught //exception induring casethe when reloadStartTime less than current time, reload will happen next day reload", e); initialDelayDuration = initialDelayDuration.plus(1, ChronoUnit.DAYS); } } }, initialDelay = initialDelayDuration.toMillis(); } executorService = Executors.newSingleThreadScheduledExecutor(); 0, executorService.scheduleWithFixedDelay( reloadTask, initialDelay, delay, TimeUnitreloadInterval.MILLISECONDStoMillis();, } @Override public void stopReloading() { if (executorService != null) { TimeUnit.MILLISECONDS); executorService.shutdown()break; } }default: /** Creates reload strategy with periodic intervals. */ public staticthrow FixedDelayReloadStrategynew withInterval(Duration reloadInterval) {IllegalArgumentException( return new FixedDelayReloadStrategy(reloadInterval, null); } /** * Creates reload strategy with periodic intervals after initial delay up to {@code * reloadStartTime}.String.format("Unrecognized schedule mode \"%s\"", scheduleMode)); } } */@Override public staticvoid FixedDelayReloadStrategy withIntervalAfterDelayclose( ) throws Exception { if Duration reloadInterval, LocalTime reloadStartTime(scheduledExecutor != null) { return new FixedDelayReloadStrategy(reloadInterval, reloadStartTimescheduledExecutor.shutdownNow(); } /** Creates strategy with daily reload at specified {@code reloadStartTime}. */} } public static FixedDelayReloadStrategy dailyAtSpecifiedTime(LocalTime reloadStartTime) { enum ScheduleMode { FIXED_DELAY, return new FixedDelayReloadStrategy(Duration.ofDays(1), reloadStartTime); FIXED_RATE } } |
TableFunctionProvider / AsyncTableFunctionProvider
...
Option | Type | Descriptions | |||
---|---|---|---|---|---|
lookup.cache | Enum of NONE, PARTIAL and FULL | The caching strategy for this lookup table. NONE: Do not use cache Partial: Use partial caching mode FULL: Use full caching mode | |||
lookup.async | Boolean | Whether to use asynchronous mode for the lookup table | lookup.max-retries | Integer | The maximum allowed retries if a lookup operation fails |
lookup.partial-cache.expire-after-access | Duration | Duration to expire an entry in the cache after accessing | |||
lookup.partial-cache.expire-after-write | Duration | Duration to expire an entry in the cache after writing | |||
lookup.partial-cache.cache-missing-key | Boolean | Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table | |||
lookup.partial-cache.max-rows | Long | The maximum number of rows to store in the cache | |||
lookup.full-cache.reload-interval | Duration | Interval of reloading all entries from the lookup table into cache | |||
lookup.full-cache.reload-startschedule-time | String | mode | Enum of FIXED_DELAY and FIXED_RATE | The periodically schedule mode of reloading. The start time of the cache reload operation in UTC |
Cache Metrics
It is important to mention that a cache implementation does not have to report all the defined metrics. But if a cache reports a metric of the same semantic defined below, the implementation should follow the convention.
...