Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

JIRA: TBD

...

Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-28415

Release1.16


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

This FLIP is a joint work of initially proposed by Yuan Zhu (zstraw@163.com), and finished by Qingsheng Ren (partial caching part) and Alexander Smirnov (full caching part).


Table of Contents

Motivation

...

Code Block
languagejava
titleLookupFunctionLookupFunctionProvider
/** A provider for creating {@link LookupFunction}. */
@PublicEvolving
public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {

    static LookupFunctionProvider of(LookupFunction lookupFunction) {
        return () -> lookupFunction;
    }

    LookupFunction createLookupFunction();
}

...

Code Block
languagejava
titleLookupFunctionAsyncLookupFunctionProvider
/** A provider for creating {@link AsyncLookupFunction}. */
@PublicEvolving
public interface AsyncLookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {

    static AsyncLookupFunctionProvider of(AsyncLookupFunction asyncLookupFunction) {
        return () -> asyncLookupFunction;
    }

    AsyncLookupFunction createAsyncLookupFunction();
}

...

Code Block
languagejava
titlePartialCacheLookupCache
/**
 * A semi-persistent mapping from keys to values for storing entries of lookup table.
 *
 * <p>The type of the caching key is a {@link RowData} with lookup key fields packed inside. The
 * type of value is a {@link Collection} of {@link RowData}, which are rows matching lookup key
 * fields.
 *
 * <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either
 * evicted or manually invalidated.
 *
 * <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
 * by multiple concurrent threads.
 */
@PublicEvolving
public interface LookupCache extends AutoClosable, Serializable {

    /**
     * Initialize the cache.
     *
     * @param metricGroup the metric group to register cache related metrics.
     */
    void open(CacheMetricGroup metricGroup);

    /**
     * Returns the value associated with key in this cache, or null if there is no cached value for
     * key.
     */
    @Nullable
    Collection<RowData> getIfPresent(RowData key);

    /**
     * Associates the specified value rows with the specified key row in the cache. If the cache
     * previously contained value associated with the key, the old value is replaced by the
     * specified value.
      *
     * @return the previous value rows associated with key, or null if there was no mapping for key.
     * @param key - key row with which the specified value is to be associated
     * @param value – value rows to be associated with the specified key
     */
    Collection<RowData> put(RowData key, Collection<RowData> value);

    /** Discards any cached value for the specified key. */
    void invalidate(RowData key);

    /** Returns the number of key-value mappings in the cache. */
    long size();
}

...

Code Block
languagejava
titleDefaultPartialCacheDefaultLookupCache
/** Default implementation of {@link LookupCache}. */
@PublicEvolving
public class DefaultLookupCache implements LookupCache {
    private final Duration expireAfterAccessDuration;
    private final Duration expireAfterWriteDuration;
    private final Long maximumSize;
    private final boolean cacheMissingKey;
    
    private DefaultLookupCache(
            Duration expireAfterAccessDuration,
            Duration expireAfterWriteDuration,
            Long maximumSize,
			boolean cacheMissingKey) {
        this.expireAfterAccessDuration = expireAfterAccessDuration;
        this.expireAfterWriteDuration = expireAfterWriteDuration;
        this.initialCapacity = initialCapacity;
        this.maximumSize = maximumSize;
		this.cacheMissingKey = cacheMissingKey;
    }
    
    public static Builder newBuilder() {
        return new Builder();
    } 

   public static class Builder {         
        private Duration expireAfterAccessDuration;
        private Duration expireAfterWriteDuration;
        private Long maximumSize;
        private Boolean cacheMissingKey;

        public Builder expireAfterAccess(Duration duration) {
            expireAfterAccessDuration = duration;
            return this;
        }

        public Builder expireAfterWrite(Duration duration) {
            expireAfterWriteDuration = duration;
            return this;
        }

        public Builder maximumSize(long maximumSize) {
            this.maximumSize = maximumSize;
            return this;
        }

        public Builder cacheMissingKey(boolean cacheMissingKey) {
            this.cacheMissingKey = cacheMissingKey;
            return this;
        }          

        public DefaultLookupCache build() {
            return new DefaultLookupCache(
                    expireAfterAccessDuration,
                    expireAfterWriteDuration,
                    maximumSize,
					cacheMissingKey);
        }
    }     
}

...

Code Block
languagejava
titleLookupCacheMetricGroupCacheMetricGroup
/**
 * Pre-defined metrics for cache.
 *
 * <p>Please note that these methods should only be invoked once. Registering a metric with same
 * name for multiple times would lead to an undefined behavior.
 */
@PublicEvolving
public interface CacheMetricGroup extends MetricGroup {
    /** The number of cache hits. */
    void hitCounter(Counter hitCounter);

    /** The number of cache misses. */
    void missCounter(Counter missCounter);

    /** The number of times to load data into cache from external system. */
    void loadCounter(Counter loadCounter);

    /** The number of load failures. */
    void numLoadFailuresCounter(Counter numLoadFailuresCounter);

    /** The time spent for the latest load operation. */
    void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);

    /** The number of records in cache. */
    void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);

    /** The number of bytes used by cache. */
    void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
}

...

Code Block
languagejava
titleLookupFunctionProviderPartialCachingLookupProvider
/**
 * Provider for creating {@link LookupFunction} and {@link LookupCache} for storing lookup entries.
 */
@PublicEvolving
public interface PartialCachingLookupProvider extends LookupFunctionProvider {

    /**
     * Build a {@link PartialCachingLookupProvider} from the specified {@link LookupFunction} and
     * {@link LookupCache}.
     */
    static PartialCachingLookupProvider of(LookupFunction lookupFunction, LookupCache cache) {
        return new PartialCachingLookupProvider() {

            @Override
            public LookupCache getCache() {
                return cache;
            }

            @Override
            public LookupFunction createLookupFunction() {
                return lookupFunction;
            }
        };
    }

    /** Get a new instance of {@link LookupCache}. */
    LookupCache getCache();
}

...

Code Block
languagejava
titleLookupFunctionProviderPartialCachingAsyncLookupProvider
/**
 * Provider for creating {@link AsyncLookupFunction} and {@link LookupCache} for storing lookup entries.
 */
@PublicEvolving
public interface PartialCachingAsyncLookupProvider extends AsyncLookupFunctionProvider {

    /**
     * Build a {@link PartialCachingLookupProvider} from the specified {@link AsyncLookupFunction} and
     * {@link LookupCache}.
     */
    static PartialCachingLookupProvider of(AsyncLookupFunction asyncLookupFunction, LookupCache cache) {
        return new PartialCachingAsyncLookupProvider() {

            @Override
            public LookupCache getCache() {
                return cache;
            }

            @Override
            public AsyncLookupFunction createAsyncLookupFunction() {
                return asyncLookupFunction;
            }
        };
    }

    /** Get a new instance of {@link LookupCache}. */
    LookupCache getCache();
}

...

Code Block
languagejava
titleFullCachingLookupProvider
/**
 * A {@link CachingLookupProvider} that never lookup in external system on cache miss and provides a
 * cache for holding all entries in the external system. The cache will be fully reloaded from the
 * external system by the {@link ScanTableSource.ScanRuntimeProvider} and reload operations will be
 * triggered by the {@link CacheReloadTrigger}.
 */
@PublicEvolving
public interface FullCachingLookupProvider extends LookupFunctionProvider {
    static FullCachingLookupProvider of(
            ScanTableSource.ScanRuntimeProvider scanRuntimeProvider,
            CacheReloadTrigger cacheReloadTrigger) {
        return new FullCachingLookupProvider() {
            @Override
            public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider() {
                return scanRuntimeProvider;
            }

            @Override
            public CacheReloadTrigger getCacheReloadTrigger() {
                return cacheReloadTrigger;
            }

            @Override
            public LookupFunction createLookupFunction() {
                return keyRow -> null;
            }
        };
    }

    ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();

/**
    CacheReloadTrigger getCacheReloadTrigger();
}

CacheReloadTrigger

A trigger defining custom logic for triggering full cache reloading.

* Get a {@link ScanTableSource.ScanRuntimeProvider} for scanning all entries from the external
     * lookup table and load into the cache.
     */
    ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();

    /** Get a {@link CacheReloadTrigger} for triggering the reload operation. */
    CacheReloadTrigger getCacheReloadTrigger();
}

CacheReloadTrigger

A trigger defining custom logic for triggering full cache reloading.

Code Block
languagejava
titleCacheReloadTrigger
/** Customized trigger for reloading all lookup table entries in full caching mode. */
Code Block
languagejava
titleFullCachingReloadTrigger
/** Customized trigger for reloading all lookup table entries in full caching mode. */
@PublicEvolving
public interface CachingReloadTrigger extends AutoCloseable, Serializable {

    /** Open the trigger. */
    void open(Context context) throws Exception;

    /**
     * Context of {@link CacheReloadTrigger} for getting information about times and
     * triggering reload.
     */
    interface Context {

        /** Get current processing time. */
        long currentProcessingTime();

        /** Get current watermark on the main stream. */
        long currentWatermark();

        /** Trigger a reload operation on the full cache. */
        CompletableFuture<Void> triggerReload();
    }
}

...

An implementation of FullCachingReloadTrigger that triggers reload with a specified interval.

Code Block
languagejava
titlePeriodicCacheReloadTrigger
/** A trigger that reloads all entries periodically with specified interval or delay. */
public class PeriodicCacheReloadTrigger implements CacheReloadTrigger {

    private final Duration reloadInterval;
    private final ScheduleMode scheduleMode;

    public PeriodicCacheReloadTrigger(Duration reloadInterval, ScheduleMode scheduleMode) {
        this.reloadInterval = reloadInterval;
        this.scheduleMode = scheduleMode;
    }

    @Override
    public void open(CacheReloadTrigger.Context context) {
        // Register periodic reload task
    }

    @Override
    public void close() throws Exception {
        // Dispose resources
    }

    public enum ScheduleMode {
        FIXED_DELAY,
        FIXED_RATE
    }
}

TimedCacheReloadTrigger

Code Block
languagejava
titleTimedCacheReloadTrigger
/** A trigger that reloads at a specific local time and repeat for the given interval in days. */ 
public class TimedCacheReloadTrigger implements CacheReloadTrigger {

    private final LocalTime reloadTime;
    private final int reloadIntervalInDays;

    public TimedCacheReloadTrigger(LocalTime reloadTime, int reloadIntervalInDays) {
        this.reloadTime = reloadTime;
        this.reloadIntervalInDays = reloadIntervalInDays;
    }

    @Override
    public void open(Context context) {
		// Register periodic reload task
    }

    @Override
    public void close() throws Exception {
		// Dispose resources
    }
}

...

OptionTypeDescriptions
lookup.cacheEnum of NONE, PARTIAL and FULL

The caching strategy for this lookup table.

NONE: Do not use cache

Partial: Use partial caching mode

FULL: Use full caching mode

lookup.max-retriesIntegerThe maximum allowed retries if a lookup operation fails
lookup.partial-cache.expire-after-accessDurationDuration to expire an entry in the cache after accessing
lookup.partial-cache.expire-after-writeDurationDuration to expire an entry in the cache after writing
lookup.partial-cache.cache-missing-keyBooleanWhether to store an empty value into the cache if the lookup key doesn't match any rows in the table
lookup.partial-cache.max-rowsLongThe maximum number of rows to store in the cache
lookup.full-cache.reload-strategyEnum of PERIODIC and TIMED

The reload strategy for the full cache scenario.

PERIODIC: Use PeriodicCacheReloadTrigger

TIMED: Use TimedCacheReloadTrigger

lookup.full-cache.periodic-reload.intervalDurationDuration to trigger reload in the PeriodicCacheReloadTrigger
lookup.full-cache.periodic-reload.schedule-modeEnum of FIXED_DELAY and FIXED_RATEThe periodically schedule mode of reloading in the PeriodicCacheReloadTrigger
lookup.full-cache.timed-reload.iso-at-timeLocalTimeString

Time in ISO-8601 format when cache needs to be reloaded. Time can be specified either with timezone or without timezone (target JVM local timezone will be used). See formatter ISO_TIME.

The local time on the target JVM to trigger a reload 

lookup.full-cache.timed-reload.interval-in-daysInteger

The interval in days to trigger the reload at the specified time

...