Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • LookupFunction / AsyncLookupFunction, an extended version of TableFunction to clarify the semantic of lookup.
  • LookupCache / LookupCacheFactory, defining the cache and its factory used in lookup table.
  • DefaultLookupCacheFactory, a DefaultLookupCache a default implementation of a cache that suitable for most use cases.
  • LookupCacheMetricGroupCacheMetricGroup, defining metrics should be reported by the lookup cache.
  • LookupFunctionProvider PartialCachingLookupProvider / AsyncLookupFunctionProvider AsyncPartialCachingLookupProvider, as the API interacting with table source to get LookupFunction and LookupCacheFactory.

...

If the size of lookup table is relatively small to fit into the memory, and the lookup table doesn't change frequently, it'll be more efficient to load all entries of the lookup table into the cache to reduce network I/O, and refresh the table periodically.  We'd like to name this use case as "full cache". Logically the reload operation is a kind of scan, so we'd like to reuse the ScanRuntimeProvider so that developers could reuse the scanning logic implemented in Source / SourceFunction / InputFormat. Considering the complexity of Source API, we'd like to support SourceFunction and InputFormat API first. Supporting Source API might require new topology and will be discussed later in another FLIP.

We propose to introduce a several new interface FullCachingLookupProvider in order to reuse interfaces:

  • FullCachingLookupProvider, for reusing the ability of scanning.

...

  • FullCachingReloadTrigger, for customizing reloading strategies of all entries in the full cache.

Public Interfaces

Lookup Functions

...

Code Block
languagejava
titleAsyncLookupFunction
/**
 * A wrapper class of {@link AsyncTableFunction} for asynchronously lookup rows matching the lookup
 * keys from external system.
 *
 * <p>The output type of this table function is fixed as {@link RowData}.
 */
@PublicEvolving
public abstract class AsyncLookupFunction extends AsyncTableFunction<RowData> {

    /**
     * Asynchronously lookup rows matching the lookup keys.
     *
     * @param keyRow - A {@link RowData} that wraps keys to lookup.
     * @return A collections of all matching rows in the lookup table.
     */
    public abstract CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow);

    /** Invokes {@link #asyncLookup} and chains futures. */
    public final void eval(CompletableFuture<Collection<RowData>> future, Object... keys) {
         asyncLookup(GenericRowData.of(keys))
                .whenCompleteAsync(
                        (result, exception) -> {
                            if (exception != null) {
                                future.completeExceptionally(exception);
                                return;
                            }
                            future.complete(result);
                        });
    }
}

...

LookupCache

Considering there might be custom caching strategies and optimizations, we'd like to expose the cache interface as public API for developers to make the cache pluggable.

Code Block
languagejava
titlePartialCache
/**
 * A semi-persistent mapping from keys to values for storing entries of lookup table.
 *
 * <p>The type of the caching key is a {@link RowData} with lookup key fields packed inside. The
 * type of value is a {@link Collection} of {@link RowData}, which are rows matching lookup key
 * fields.
 *
 * <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either
 * evicted or manually invalidated.
 *
 * <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
 * by multiple concurrent threads.
 */
@PublicEvolving
public interface PartialCacheLookupCache extends AutoClosable {

     void open(LookupCacheMetricGroup metricGroup);
    
   /**
     * Initialize the cache.
     *
     * @param metricGroup the metric group to register cache related metrics.
     */
    void open(CacheMetricGroup metricGroup);

    /**
     * Returns the value associated with key in this cache, or null if there is no cached value for
     * key.
     */
    @Nullable
    Collection<RowData> getIfPresent(RowData key);

    /**
     * Associates the specified value rows with the specified key row in the cache. If the cache
     * previously contained value associated with the key, the old value is replaced by the
     * specified value.
      *
     * @return the previous value rows associated with key, or null if there was no mapping for key.
     * @param key - key row with which the specified value is to be associated
     * @param value – value rows to be associated with the specified key
     */
    Collection<RowData> put(RowData key, Collection<RowData> value);

    /** Discards any cached value for the specified key. */
    void invalidate(RowData key);

    /** Returns the number of key-value mappings in the cache. */
    long size();


    void close(); 

    
}

...

DefaultLookupCache

As the cache should be instantiated during runtime execution to avoid serialization / deserialization, a factory is required for creating the cache. 

Code Block
languagejava
titleDefaultPartialCache
/** Factory for creating an instanceDefault implementation of {@link LookupCache}. */
@PublicEvolving
public class DefaultPartialCacheDefaultLookupCache implements PartialCacheLookupCache {
    private final Duration expireAfterAccessDuration;
    private final Duration expireAfterWriteDuration;
    private final Long maximumSize;
    
    private DefaultPartialCacheDefaultLookupCache(
            Duration expireAfterAccessDuration,
            Duration expireAfterWriteDuration,
            Long maximumSize) {
        this.expireAfterAccessDuration = expireAfterAccessDuration;
        this.expireAfterWriteDuration = expireAfterWriteDuration;
        this.initialCapacity = initialCapacity;
        this.maximumSize = maximumSize;
    }
   // overide
    
    public static Builder newBuilder() {
        return new Builder();
    } 

   public static class Builder {
        private Duration expireAfterAccessDuration;
        private Duration expireAfterWriteDuration;
        private Long maximumSize;

        public DefaultPartialCacheDefaultLookupCache.Builder expireAfterAccess(Duration duration) {
            expireAfterAccessDuration = duration;
            return this;
        }

        public DefaultPartialCacheDefaultLookupCache.Builder expireAfterWrite(Duration duration) {
            expireAfterWriteDuration = duration;
            return this;
        }

        public DefaultPartialCacheDefaultLookupCache.Builder maximumSize(long maximumSize) {
            this.maximumSize = maximumSize;
            return this;
        }

        public DefaultPartialCacheDefaultLookupCache build() {
            return new DefaultPartialCacheDefaultLookupCache(
                    expireAfterAccessDuration,
                    expireAfterWriteDuration,
                    maximumSize);
        }
    }     

}

...

CacheMetricGroup

An interface defining all cache related metric:

Code Block
languagejava
titleLookupCacheMetricGroup
/**
 * Pre-defined metrics for {@code LookupCache}cache.
 *
 * <p>Please note that these methods should only be invoked once. Registering a metric with same
 * name for multiple times would lead to an undefined behavior.
 */
@PublicEvolving
public interface LookupCacheMetricGroupCacheMetricGroup extends MetricGroup {
    /** The number of cache hits. */
    void hitCounter(Counter hitCounter);

    /** The number of cache misses. */
    void missCounter(Counter missCounter);

    /** The number of times to load data into cache from external system. */
    void loadCounter(Counter loadCounter);

    /** The number of load failures. */
    void numLoadFailuresCounter(Counter numLoadFailuresCounter);

    /** The time spent for the latest load operation. */
    void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);

    /** The number of records in cache. */
    void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);

    /** The number of bytes used by cache. */
    void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
}

...

PartialCachingLookupProvider

This is the API between table framework and user's table source. Implementation should define how to create a lookup function and whether to use cache. 

Code Block
languagejava
titleLookupFunctionProvider
//**
 * Provider for creating {@link LookupFunction} and {@link LookupCacheFactoryLookupCache} if caching should be enabled
 * enabled for the lookup table.
 */
@PublicEvolving
public interface LookupFunctionProviderPartialCachingLookupProvider extends LookupTableSource.LookupRuntimeProvider {

    
    /** Creates a builder of {@link LookupFunctionProviderPartialCachingLookupProvider}. */
    static Builder newBuilder() {
        return new Builder();
    }

    /** Creates an {@link LookupFunction} instance. */
    LookupFunction createLookupFunction();

    /**
     * Gets the instance of {@link LookupCacheFactory} for creating lookup cacheLookupCache}.
     *
     * <p>This cache will be used initialized by {@link LookupCache#open} during runtime execution forand
     * optimizingused for optimizing the access to external lookup table.
     *
     * @return an {@link Optional} of {@link LookupCache}, or an empty {@link Optional} if caching
     *     caching shouldn't be applies to the lookup table.
     */
    Optional<PartialCache>Optional<LookupCache> getCache();

    /**
     * Whether the missing key (key fields without any matching value rows) should be stored in the
     * cache.
     *
     * <p>Please note that this option is required if {@link #getCacheFactory()#getCache} returns a non-empty
     * instance. If the cache factory is empty, the return value of this function will be ignored.
     *
     * @return true if a null or empty value should be stored in the cache.
     */
    Optional<Boolean> cacheMissingKey();

      /** Builder class for {@link LookupFunctionProviderPartialCachingLookupProvider}. */
    class Builder {

        private LookupFunction lookupFunction;
        private PartialCacheLookupCache partialCachecache;
        private Boolean enableCacheMissingKeycacheMissingKey;

        /** Sets lookup function. */
        public Builder withLookupFunction(LookupFunction lookupFunction) {
            this.lookupFunction = lookupFunction;
            return this;
        }

        /** Enables caching and sets the cache factorEnables storing missing key in the cachefactory. */
        public Builder withCache(PartialCacheLookupCache partialCache cache, Booleanboolean enableCacheMissingKeycacheMissingKey) {
            this.partialCachecache = partialCachecache;
            this.enableCacheMissingKey            this.cacheMissingKey = truecacheMissingKey;
            return this;
     return this;
  }

      }

        public LookupFunctionProviderPartialCachingLookupProvider build() {
            // Build LookupFunctionProvider...
        }
    }
}

...

AsyncPartialCachingLookupProvider

Code Block
languagejava
titleAsyncLookupFunctionProvider
/**
 * Provider for creating {@link AsyncLookupFunction} and {@link LookupCache} if caching should be
 * enabled for the lookup table.
 */
@PublicEvolving
public interface AsyncLookupFunctionProviderAsyncPartialCachingLookupProvider extends LookupTableSource.LookupRuntimeProvider {

      /** Creates a builder of {@link LookupFunctionProviderAsyncPartialCachingLookupProvider}. */
    static AsyncPartialCachingLookupProvider.Builder newBuilder() {
        return new AsyncPartialCachingLookupProvider.Builder();
    }

    /** Creates an {@link AsyncLookupFunction} instance. */
    AsyncLookupFunction createAsyncLookupFunction();

    /**
     * Gets the {@link LookupCacheFactoryLookupCache} for creating lookup cache.
     *
     * <p>This factory will be used for creating an instance of cache during runtime execution for
     * optimizing the access to external lookup table.
     *
     * @return an {@link Optional} of {@link LookupCacheFactoryLookupCache}, or an empty {@link Optional} if caching
     *     caching shouldn't be applies to the lookup table.
     */
    Optional<LookupCacheFactory>Optional<LookupCache> getCacheFactorygetCache();

    /**
     * Whether the missing key (key fields without any matching value rows) should be stored in the
     * cache.
     *
     * <p>Please note that this option is required if {@link #getCacheFactory()#getCache} returns a non-empty
     * instance. If the cache factory is empty, the return value of this function will be ignored.
     *
     * @return true if a null or empty value should be stored in the cache.
     */
    Optional<Boolean> cacheMissingKey();

    
    /** Builder class for {@link AsyncLookupFunctionProviderAsyncPartialCachingLookupProvider}. */
    class Builder {

        private AsyncLookupFunction asyncLookupFunction;
        private LookupCacheFactoryLookupCache cacheFactorycache;
        private Boolean enableCacheMissingKeycacheMissingKey;

        /** Sets lookup function. */
        public AsyncPartialCachingLookupProvider.Builder withAsyncLookupFunction(withLookupFunction(
                AsyncLookupFunction asyncLookupFunction) {
            this.asyncLookupFunction = asyncLookupFunction;
            return this;
        }

        /** Enables caching and sets the cache factory. */
        public AsyncPartialCachingLookupProvider.Builder withCacheFactory(LookupCacheFactory cacheFactorywithCache(
                LookupCache cache, boolean cacheMissingKey) {
            this.cacheFactorycache = cacheFactorycache;
            return thisthis.cacheMissingKey = cacheMissingKey;
        }

    return this;
   /**
     }

    * Enables storing missing keypublic inAsyncPartialCachingLookupProvider the cache.build() {
         *  ...
        }
 * <p>See {@link AsyncLookupFunctionProvider#cacheMissingKey()} for more details.
         */
        public Builder enableCacheMissingKey() {
            this.enableCacheMissingKey = true;
            return this;
        } }
}

FullCachingLookupProvider

This interface is for supporting full cache strategy. It reuses ScanRuntimeProvider and defines reload time. 

Code Block
languagejava
titleFullCachingLookupProvider
/**
 * Runtime provider for fully loading and periodically reloading all entries of the lookup table and
 * storing the table locally for lookup.
 *
 * <p>Implementations should provide a {@link ScanTableSource.ScanRuntimeProvider} in order to reuse
 * the ability of scanning for loading all entries from the lookup table.
 */
@PublicEvolving
public interface FullCachingLookupProvider extends LookupTableSource.LookupRuntimeProvider {

    static    public AsyncLookupFunctionProvider build() {FullCachingLookupProvider of(
            // Build AsyncLookupFunctionProvider
ScanTableSource.ScanRuntimeProvider scanRuntimeProvider,
          }
  FullCachingReloadTrigger  }
}

FullCachingLookupProvider

This interface is for supporting full cache strategy. It reuses ScanRuntimeProvider and defines reload time. 

fullCachingReloadTrigger) {
        return new FullCachingLookupProvider() {
            @Override
            public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider() {
                return scanRuntimeProvider;
            }

            @Override
            public FullCachingReloadTrigger getReloadTrigger() {
                return fullCachingReloadTrigger;
            }
        };
    }

    /**
     * Gets the {@link ScanTableSource.ScanRuntimeProvider} for executing the periodically reload.
     */
    ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();

    /** Get the {@link FullCachingReloadTrigger} for triggering a full caching reload operation. */
    FullCachingReloadTrigger getReloadTrigger();
}

FullCachingReloadTrigger

A trigger defining custom logic for triggering full cache reloading.

Code Block
languagejava
titleFullCachingReloadTrigger
/** Customized trigger for reloading all lookup table entries in full caching mode. */
public interface FullCachingReloadTrigger extends AutoCloseable, Serializable {

    /** Open the trigger. */
    void open(Context context) throws Exception;

    
Code Block
languagejava
titleFullCachingLookupProvider
/**
 * Runtime provider for fully loading and periodically reloading all entries of the lookup table and
 * storing the table locally for lookup.
 *
 * <p>Implementations should provide a {@link ScanTableSource.ScanRuntimeProvider} in order to reuse
 * the ability of scanning for loading all entries from the lookup table.
 */
@PublicEvolving
public interface FullCachingLookupProvider extends LookupTableSource.LookupRuntimeProvider {  
          
	/**
     * Context Createsof {@link FullCachingLookupProviderFullCachingReloadTrigger} withfor providedgetting scaninformation runtimeabout providertimes and reload
     * timetriggering reload.
     */
    staticinterface FullCachingLookupProviderContext of({

        /** Get current processing ScanTableSourcetime.ScanRuntimeProvider scanRuntimeProvider,*/
            ReloadStrategy reloadStrategy) {
        return new FullCachingLookupProvider() {long currentProcessingTime();

        /** Get current watermark @Override
on the main  stream. */
       public ScanTableSource.ScanRuntimeProviderlong getScanRuntimeProvidercurrentWatermark() {;

        /** Trigger a reload operation on the full return scanRuntimeProvider;cache. */
        CompletableFuture<Void> triggerReload();
    }
}

PeriodicFullCachingReloadTrigger

An implementation of FullCachingReloadTrigger that triggers reload with a specified interval.

Code Block
/** A trigger that reloads all entries periodically with specified interval or @Override
 delay. */
public class PeriodicFullCachingReloadTrigger implements FullCachingReloadTrigger {

    private final Duration reloadInterval;
    publicprivate ReloadStrategyfinal getReloadStrategy() {ScheduleMode scheduleMode;

    private ScheduledExecutorService scheduledExecutor;

    public PeriodicFullCachingReloadTrigger(Duration reloadInterval, ScheduleMode   return reloadStrategy;scheduleMode) {
        this.reloadInterval = reloadInterval;
  }
      this.scheduleMode = }scheduleMode;
    }

      /**@Override
    public * Gets the {@link org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider}void open(FullCachingReloadTrigger.Context context) {
     * for executing thescheduledExecutor periodically= reloadExecutors.newSingleThreadScheduledExecutor();
     */
   switch ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();    scheduleMode) {
        

     /**
case FIXED_RATE:
         * Gets the strategy when to reload cachescheduledExecutor. See {@link ReloadStrategy} and default implementation
scheduleAtFixedRate(
                  * {@link FixedDelayReloadStrategy}.
     */
context::triggerReload,
     ReloadStrategy getReloadStrategy();

    /** Defines strategy when to reload cache. */
    interface ReloadStrategy extends Serializable0,
 { 
        
		/**
         * Schedules task that reloads cachereloadInterval.toMillis(),
 This method is called just once, so implementation
         * must create the separate thread that will run {@code reloadTask} by its specific
 TimeUnit.MILLISECONDS);
               * triggers.break;
         */         
		void scheduleReload(Runnable reloadTask);

case FIXED_DELAY:
           /** Stops task that reloads cachescheduledExecutor. */
scheduleWithFixedDelay(
           void stopReloading();
    }
}

FixedDelayReloadStrategy

This class is default implementation of FullCachingLookupProvider.ReloadStrategy that schedules reloads with fixed delay, using ScheduledExecutorService#scheduleWithFixedDelay.

Code Block
languagejava
titleFixedDelayReloadStrategy
/**
 * Default {@link FullCachingLookupProvider.ReloadStrategy} that schedules reloads with() fixed-> delay.{
 */
public class FixedDelayReloadStrategy implements FullCachingLookupProvider.ReloadStrategy {

    private final Duration reloadInterval;
    @Nullable private final LocalTime reloadStartTime;
    private ScheduledExecutorService executorService;

 try {
  private FixedDelayReloadStrategy(Duration reloadInterval, @Nullable LocalTime reloadStartTime) {
        this.reloadInterval = reloadInterval;
        this.reloadStartTime = reloadStartTime;
    }
context.triggerReload().get();
    @Override
    public void scheduleReload(Runnable reloadTask) {
        long delay = reloadInterval.toMillis();
     } catch (Exception longe) initialDelay{
 = 0;
        if (reloadStartTime != null) {
            LocalTime now = LocalTime.now(ZoneOffset.UTC);
   throw new RuntimeException(
       Duration initialDelayDuration = Duration.between(now, reloadStartTime);
            if (initialDelayDuration.isNegative()) {
               "Uncaught //exception induring casethe when reloadStartTime less than current time, reload will happen next day
reload", e);
                         initialDelayDuration = initialDelayDuration.plus(1, ChronoUnit.DAYS); }
            }
            },
 initialDelay = initialDelayDuration.toMillis();
        }
        executorService = Executors.newSingleThreadScheduledExecutor();
   0,
     executorService.scheduleWithFixedDelay(
                reloadTask, initialDelay, delay, TimeUnitreloadInterval.MILLISECONDStoMillis();,
    }

     @Override
    public void stopReloading() {
        if (executorService != null) {
TimeUnit.MILLISECONDS);
                executorService.shutdown()break;
        }
    }default:

    /** Creates reload strategy with periodic intervals. */
    public staticthrow FixedDelayReloadStrategynew withInterval(Duration reloadInterval) {IllegalArgumentException(
        return new FixedDelayReloadStrategy(reloadInterval, null);
    }

    /**
     * Creates reload strategy with periodic intervals after initial delay up to {@code
     * reloadStartTime}.String.format("Unrecognized schedule mode \"%s\"", scheduleMode));
        }
    }

     */@Override
    public staticvoid FixedDelayReloadStrategy withIntervalAfterDelayclose(
) throws Exception {
        if Duration reloadInterval, LocalTime reloadStartTime(scheduledExecutor != null) {
          return new FixedDelayReloadStrategy(reloadInterval, reloadStartTimescheduledExecutor.shutdownNow();
    }

    /** Creates strategy with daily reload at specified {@code reloadStartTime}. */}
    }

    public static FixedDelayReloadStrategy dailyAtSpecifiedTime(LocalTime reloadStartTime) {
 enum ScheduleMode {
        FIXED_DELAY,
     return new FixedDelayReloadStrategy(Duration.ofDays(1), reloadStartTime); FIXED_RATE
    }
}

TableFunctionProvider / AsyncTableFunctionProvider

...

OptionTypeDescriptions
lookup.cacheEnum of NONE, PARTIAL and FULL

The caching strategy for this lookup table.

NONE: Do not use cache

Partial: Use partial caching mode

FULL: Use full caching mode

lookup.asyncBooleanWhether to use asynchronous mode for the lookup tablelookup.max-retriesIntegerThe maximum allowed retries if a lookup operation fails
lookup.partial-cache.expire-after-accessDurationDuration to expire an entry in the cache after accessing
lookup.partial-cache.expire-after-writeDurationDuration to expire an entry in the cache after writing
lookup.partial-cache.cache-missing-keyBooleanWhether to store an empty value into the cache if the lookup key doesn't match any rows in the table
lookup.partial-cache.max-rowsLongThe maximum number of rows to store in the cache
lookup.full-cache.reload-intervalDurationInterval of reloading all entries from the lookup table into cache
lookup.full-cache.reload-startschedule-timeStringmodeEnum of FIXED_DELAY and FIXED_RATE

The periodically schedule mode of reloading. 

The start time of the cache reload operation in UTC

Cache Metrics

It is important to mention that a cache implementation does not have to report all the defined metrics. But if a cache reports a metric of the same semantic defined below, the implementation should follow the convention.

...