Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

JIRA: TBD

...

Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-28415

Release1.16


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

This FLIP is a joint work of initially proposed by Yuan Zhu (zstraw@163.com), and finished by Qingsheng Ren (partial caching part) and Alexander Smirnov (full caching part).


Table of Contents

Motivation

...

In order to clarify the semantic of lookup, we'd like to introduce some top-level APIs for general lookup operations without caching:

  • LookupFunction / AsyncLookupFunction, an extended version of TableFunction to make the API more straight forward.
  • LookupFunctionProvider / AsyncLookupProvider, serve as the creator of LookupFunction / AsyncLookupFunction in table source

...

More specifically,  we'd like to provide public interfaces for the most 2 common cases to lookup source developers, which we call are named as partial and full caching.

...

  • FullCachingLookupProvider, for reusing the ability of scanning.
  • FullCachingReloadTriggerCacheReloadTrigger, for customizing reloading strategies of all entries in the full cache. 

Also we'd like to provide two default implementations of CacheReloadTrigger:

  • PeriodicCacheReloadTrigger, for triggering reload periodically with a specific interval
  • TimedCacheReloadTrigger, for triggering reload at the specific time and repeat with the interval in days.

Public Interfaces

Lookup Functions

...

Code Block
languagejava
titleLookupFunctionLookupFunctionProvider
/** A provider for creating {@link LookupFunction}. */
@PublicEvolving
public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {

    static LookupFunctionProvider of(LookupFunction lookupFunction) {
        return () -> lookupFunction;
    }

    LookupFunction createLookupFunction();
}

...

Code Block
languagejava
titleLookupFunctionAsyncLookupFunctionProvider
/** A provider for creating {@link AsyncLookupFunction}. */
@PublicEvolving
public interface AsyncLookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {

    static AsyncLookupFunctionProvider of(AsyncLookupFunction asyncLookupFunction) {
        return () -> asyncLookupFunction;
    }

    AsyncLookupFunction createAsyncLookupFunction();
}

...

Code Block
languagejava
titlePartialCacheLookupCache
/**
 * A semi-persistent mapping from keys to values for storing entries of lookup table.
 *
 * <p>The type of the caching key is a {@link RowData} with lookup key fields packed inside. The
 * type of value is a {@link Collection} of {@link RowData}, which are rows matching lookup key
 * fields.
 *
 * <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either
 * evicted or manually invalidated.
 *
 * <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
 * by multiple concurrent threads.
 */
@PublicEvolving
public interface LookupCache extends AutoClosable, Serializable {

    /**
     * Initialize the cache.
     *
     * @param metricGroup the metric group to register cache related metrics.
     */
    void open(CacheMetricGroup metricGroup);

    /**
     * Returns the value associated with key in this cache, or null if there is no cached value for
     * key.
     */
    @Nullable
    Collection<RowData> getIfPresent(RowData key);

    /**
     * Associates the specified value rows with the specified key row in the cache. If the cache
     * previously contained value associated with the key, the old value is replaced by the
     * specified value.
      *
     * @return the previous value rows associated with key, or null if there was no mapping for key.
     * @param key - key row with which the specified value is to be associated
     * @param value – value rows to be associated with the specified key
     */
    Collection<RowData> put(RowData key, Collection<RowData> value);

    /** Discards any cached value for the specified key. */
    void invalidate(RowData key);

    /** Returns the number of key-value mappings in the cache. */
    long size();
}

...

Code Block
languagejava
titleDefaultPartialCacheDefaultLookupCache
/** Default implementation of {@link LookupCache}. */
@PublicEvolving
public class DefaultLookupCache implements LookupCache {
    private final Duration expireAfterAccessDuration;
    private final Duration expireAfterWriteDuration;
    private final Long maximumSize;
    private final boolean cacheMissingKey;
    
    private DefaultLookupCache(
            Duration expireAfterAccessDuration,
            Duration expireAfterWriteDuration,
            Long maximumSize,
			boolean cacheMissingKey) {
        this.expireAfterAccessDuration = expireAfterAccessDuration;
        this.expireAfterWriteDuration = expireAfterWriteDuration;
        this.initialCapacity = initialCapacity;
        this.maximumSize = maximumSize;
		this.cacheMissingKey = cacheMissingKey;
    }
    
    public static Builder newBuilder() {
        return new Builder();
    } 

   public static class Builder {         
        private Duration expireAfterAccessDuration;
        private Duration expireAfterWriteDuration;
        private Long maximumSize;
        private Boolean cacheMissingKey;

        public Builder expireAfterAccess(Duration duration) {
            expireAfterAccessDuration = duration;
            return this;
        }

        public Builder expireAfterWrite(Duration duration) {
            expireAfterWriteDuration = duration;
            return this;
        }

        public Builder maximumSize(long maximumSize) {
            this.maximumSize = maximumSize;
            return this;
        }

        public Builder cacheMissingKey(boolean cacheMissingKey) {
            this.cacheMissingKey = cacheMissingKey;
            return this;
        }          

        public DefaultLookupCache build() {
            return new DefaultLookupCache(
                    expireAfterAccessDuration,
                    expireAfterWriteDuration,
                    maximumSize,
					cacheMissingKey);
        }
    }     
}

...

Code Block
languagejava
titleLookupCacheMetricGroupCacheMetricGroup
/**
 * Pre-defined metrics for cache.
 *
 * <p>Please note that these methods should only be invoked once. Registering a metric with same
 * name for multiple times would lead to an undefined behavior.
 */
@PublicEvolving
public interface CacheMetricGroup extends MetricGroup {
    /** The number of cache hits. */
    void hitCounter(Counter hitCounter);

    /** The number of cache misses. */
    void missCounter(Counter missCounter);

    /** The number of times to load data into cache from external system. */
    void loadCounter(Counter loadCounter);

    /** The number of load failures. */
    void numLoadFailuresCounter(Counter numLoadFailuresCounter);

    /** The time spent for the latest load operation. */
    void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);

    /** The number of records in cache. */
    void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);

    /** The number of bytes used by cache. */
    void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
}

...

Code Block
languagejava
titleLookupFunctionProviderPartialCachingLookupProvider
/**
 * Provider for creating {@link LookupFunction} and {@link LookupCache} for storing lookup entries.
 */
@PublicEvolving
public interface PartialCachingLookupProvider extends LookupFunctionProvider { 

    private/**
 final LookupFunction lookupFunction;
  * Build privatea final{@link LookupCache cache;
PartialCachingLookupProvider} from the specified {@link LookupFunction} and
     * {@link LookupCache}.
     */
    publicstatic PartialCachingLookupProvider of(LookupFunction lookupFunction, LookupCache cache) {
        return this.lookupFunction = lookupFunction;new PartialCachingLookupProvider() {

        this.cache   = cache;@Override
    }

    @Override
    public LookupFunctionLookupCache createLookupFunctiongetCache() {
                return lookupFunctioncache;
    }
}

AsyncPartialCachingLookupProvider

Code Block
languagejava
titleLookupFunctionProvider
/**
 * Provider for creating {@link AsyncLookupFunction} and {@link LookupCache} for storing lookup entries.
 */
@PublicEvolving
public interface AsyncPartialCachingLookupProvider extends AsyncLookupFunctionProvider { 
        }

          private final AsyncLookupFunction@Override
 asyncLookupFunction;
    private final LookupCache cache;

    public LookupFunction AsyncPartialCachingLookupProvider(AsyncLookupFunction asyncLookupFunction, LookupCache cachecreateLookupFunction() {
        this.asyncLookupFunction = asyncLookupFunction;
        return lookupFunction;
          this.cache = cache;}
    }

    @Override};
    public}

 AsyncLookupFunction createAsyncLookupFunction() {
 /** Get a new instance of {@link return asyncLookupFunction;
LookupCache}. */
    LookupCache }getCache();
}

FullCachingLookupProvider

This interface is for supporting full cache strategy. It reuses ScanRuntimeProvider and defines reload time. 

PartialCachingAsyncLookupProvider

Code Block
languagejava
titlePartialCachingAsyncLookupProvider
/**
 * Provider for creating {@link AsyncLookupFunction} and {@link LookupCache} for storing lookup entries.
 */
@PublicEvolving
public interface PartialCachingAsyncLookupProvider extends AsyncLookupFunctionProvider {

    /**
     * Build a {@link PartialCachingLookupProvider} from the specified {@link AsyncLookupFunction} and
     * {@link LookupCache}.
     */
    static PartialCachingLookupProvider of(AsyncLookupFunction asyncLookupFunction, LookupCache cache) {
        return new PartialCachingAsyncLookupProvider() {

            @Override
            public LookupCache getCache() {
                return cache;
            }

            @Override
            public AsyncLookupFunction createAsyncLookupFunction() {
                return asyncLookupFunction;
            }
        };
    }

    /** Get a new instance of {@link LookupCache}. */
    LookupCache getCache();
}

FullCachingLookupProvider

This interface is for supporting full cache strategy. It reuses ScanRuntimeProvider and defines reload time. 

Code Block
languagejava
titleFullCachingLookupProvider
/**
 * A {@link CachingLookupProvider} that never lookup in external system on cache miss and provides a
 * cache for holding all entries in the external system. The cache will be fully reloaded from the
 * external system by the {@link ScanTableSource.ScanRuntimeProvider} and reload operations will be
 * triggered by the {@link CacheReloadTrigger}.
 */
@PublicEvolving
public interface FullCachingLookupProvider extends LookupFunctionProvider {
    static FullCachingLookupProvider of(
            ScanTableSource.ScanRuntimeProvider scanRuntimeProvider,
            CacheReloadTrigger cacheReloadTrigger) {
        return new FullCachingLookupProvider() {
            @Override
            public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider() {
                return scanRuntimeProvider;
            }

            @Override
            public CacheReloadTrigger getCacheReloadTrigger() {
                return cacheReloadTrigger;
            }

            @Override
            public LookupFunction createLookupFunction() {
                return keyRow -> null;
            }
Code Block
languagejava
titleFullCachingLookupProvider
/**
 * A {@link LookupFunctionProvider} that never lookup in external system on cache miss and provides a
 * cache for holding all entries in the external system. The cache will be fully reloaded from the
 * external system and reload operations will be triggered by the {@link CacheReloadTrigger}.
 */
@PublicEvolving
public class FullCachingLookupProvider implements LookupFunctionProvider {

    private final ScanTableSource.ScanRuntimeProvider scanRuntimeProvider;
    private final CacheReloadTrigger reloadTrigger;

    public FullCachingLookupProvider(
        };
    ScanTableSource.ScanRuntimeProvider scanRuntimeProvider,}

    /**
     * Get a CacheReloadTrigger reloadTrigger) {
        this.scanRuntimeProvider = scanRuntimeProvider;
        this.reloadTrigger = reloadTrigger;
    }

    @Override{@link ScanTableSource.ScanRuntimeProvider} for scanning all entries from the external
     * lookup table and load into the cache.
     */
    ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();

    public/** LookupFunctionGet createLookupFunction()a {
@link CacheReloadTrigger} for triggering the reload   return (keyRow) -> null;
operation. */
    CacheReloadTrigger }getCacheReloadTrigger();
}  

CacheReloadTrigger

A trigger defining custom logic for triggering full cache reloading.

Code Block
languagejava
titleFullCachingReloadTriggerCacheReloadTrigger
/** Customized trigger for reloading all lookup table entries in full caching mode. */
@PublicEvolving
public interface CachingReloadTrigger extends AutoCloseable, Serializable {

    /** Open the trigger. */
    void open(Context context) throws Exception;

    /**
     * Context of {@link CacheReloadTrigger} for getting information about times and
     * triggering reload.
     */
    interface Context {

        /** Get current processing time. */
        long currentProcessingTime();

        /** Get current watermark on the main stream. */
        long currentWatermark();

        /** Trigger a reload operation on the full cache. */
        CompletableFuture<Void> triggerReload();
    }
}

...

An implementation of FullCachingReloadTrigger that triggers reload with a specified interval.

Code Block
languagejava
titlePeriodicCacheReloadTrigger
/** A trigger that reloads all entries periodically with specified interval or delay. */
public class PeriodicCacheReloadTrigger implements CacheReloadTrigger {

    private final Duration reloadInterval;
    private final ScheduleMode scheduleMode;

    public PeriodicCacheReloadTrigger(Duration reloadInterval, ScheduleMode scheduleMode) {
        this.reloadInterval = reloadInterval;
        this.scheduleMode = scheduleMode;
    }

    @Override
    public void open(CacheReloadTrigger.Context context) {
        // Register periodic reload task
    }

    @Override
    public void close() throws Exception {
        // Dispose resources
    }

    public enum ScheduleMode {
        FIXED_DELAY,
        FIXED_RATE
    }
}

TimedCacheReloadTrigger

Code Block
languagejava
titleTimedCacheReloadTrigger
/** A trigger that reloads at a specific local time and repeat for the given interval in days. */ 
public class TimedCacheReloadTrigger implements CacheReloadTrigger {

    private final LocalTime reloadTime;
    private final int reloadIntervalInDays;

    public TimedCacheReloadTrigger(LocalTime reloadTime, int reloadIntervalInDays) {
        this.reloadTime = reloadTime;
        this.reloadIntervalInDays = reloadIntervalInDays;
    }

    @Override
    public void open(Context context) {
		// Register periodic reload task
    }

    @Override
    public void close() throws Exception {
		// Dispose resources
    }
}

...

OptionTypeDescriptions
lookup.cacheEnum of NONE, PARTIAL and FULL

The caching strategy for this lookup table.

NONE: Do not use cache

Partial: Use partial caching mode

FULL: Use full caching mode

lookup.max-retriesIntegerThe maximum allowed retries if a lookup operation fails
lookup.partial-cache.expire-after-accessDurationDuration to expire an entry in the cache after accessing
lookup.partial-cache.expire-after-writeDurationDuration to expire an entry in the cache after writing
lookup.partial-cache.cache-missing-keyBooleanWhether to store an empty value into the cache if the lookup key doesn't match any rows in the table
lookup.partial-cache.max-rowsLongThe maximum number of rows to store in the cache
lookup.full-cache.reload-strategyEnum of PERIODIC and TIMED

The reload strategy for the full cache scenario.

PERIODIC: Use PeriodicCacheReloadTrigger

TIMED: Use TimedCacheReloadTrigger

lookup.full-cache.periodic-reload.intervalDurationDuration to trigger reload in the PeriodicCacheReloadTrigger
lookup.full-cache.periodic-reload.schedule-modeEnum of FIXED_DELAY and FIXED_RATEThe periodically schedule mode of reloading in the PeriodicCacheReloadTrigger
lookup.full-cache.timed-reload.iso-at-timeLocalTimeString

Time in ISO-8601 format when cache needs to be reloaded. Time can be specified either with timezone or without timezone (target JVM local timezone will be used). See formatter ISO_TIME.

The local time on the target JVM to trigger a reload 

lookup.full-cache.timed-reload.interval-in-daysInteger

The interval in days to trigger the reload at the specified time

...