Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

If the size of lookup table is relatively small to fit into the memory, and the lookup table doesn't change frequently, it'll be more efficient to load all entries of the lookup table into the cache to reduce network I/O, and refresh the table periodically or at specified time of day.  We'd like to name this use case as "full cache". Logically the reload operation is a kind of scan, so we'd like to reuse the ScanRuntimeProvider so that developers could reuse the scanning logic implemented in Source / SourceFunction / InputFormat. Considering the complexity of Source API, we'd like to support SourceFunction and InputFormat API first. Supporting Source API might require new topology and will be discussed later in another FLIP.

We propose to introduce a new interface FullCachingLookupProvider in order to reuse the ability of scanning.
There can be multiple strategies when to reload cache: with intervals, at specified time of day or using custom scheduling strategies.
Interface FullCachingLookupProvider.ReloadStrategy
serves this purpose.


Public Interfaces

Lookup Functions

...

Code Block
languagejava
titleAsyncLookupFunction
/**
 * A wrapper class of {@link AsyncTableFunction} for asynchronously lookup rows matching the lookup
 * keys from external system.
 *
 * <p>The output type of this table function is fixed as {@link RowData}.
 */
@PublicEvolving
public abstract class AsyncLookupFunction extends AsyncTableFunction<RowData> {

    /**
     * Asynchronously lookup rows matching the lookup keys.
     *
     * @param keyRow - A {@link RowData} that wraps keys to lookup.
     * @return A collections of all matching rows in the lookup table.
     */
    public abstract CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow);

    /** Invokes {@link #asyncLookup} and chains futures. */
    public final void eval(CompletableFuture<Collection<RowData>> future, Object... keys) {
         asyncLookup(GenericRowData.of(keys))
                .whenCompleteAsync(
                        (result, exception) -> {
                            if (exception != null) {
                                future.completeExceptionally(exception);
                                return;
                            }
                            future.complete(result);
                        });
    }
}

...

PartialCache

Considering there might be custom caching strategies and optimizations, we'd like to expose the cache interface as public API for developers to make the cache pluggable.

Code Block
languagejava
titleLookupCachePartialCache
/**
 * A semi-persistent mapping from keys to values for storing entries of lookup table.
 *
 * <p>The type of the caching key is a {@link RowData} with lookup key fields packed inside. The
 * type of value is a {@link Collection} of {@link RowData}, which are rows matching lookup key
 * fields.
 *
 * <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either
 * evicted or manually invalidated.
 *
 * <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
 * by multiple concurrent threads.
 */
@PublicEvolving
public interface LookupCachePartialCache {

     void open(LookupCacheMetricGroup metricGroup);
    
   /**
     * Returns the value associated with key in this cache, or null if there is no cached value for
     * key.
     */
    @Nullable
    Collection<RowData> getIfPresent(RowData key);

    /**
     * Associates the specified value rows with the specified key row in the cache. If the cache
     * previously contained value associated with the key, the old value is replaced by the
     * specified value.
      *
     * @return the previous value rows associated with key, or null if there was no mapping for key.
     * @param key - key row with which the specified value is to be associated
     * @param value – value rows to be associated with the specified key
     */
    Collection<RowData> put(RowData key, Collection<RowData> value);

    /** Discards any cached value for the specified key. */
    void invalidate(RowData key);

    /** Returns the number of key-value mappings in the cache. */
    long size();


    void close(); 

    
}

...

DefaultPartialCache

As the cache should be instantiated during runtime execution to avoid serialization / deserialization, a factory is required for creating the cache. 

Code Block
languagejava
titleLookupCacheFactoryDefaultPartialCache

/** Factory for creating an instance of {@link LookupCache}. */
@PublicEvolving
public interfaceclass LookupCacheFactoryDefaultPartialCache extendsimplements SerializablePartialCache {

    private  /**final Duration expireAfterAccessDuration;
    private *final CreateDuration aexpireAfterWriteDuration;
 {@link LookupCache}.
  private final Long *maximumSize;
    
 *   @paramprivate metricGroupDefaultPartialCache(
 - The lookup cache metric group in which the cache register predefinedDuration andexpireAfterAccessDuration,
     *       customDuration metrics.expireAfterWriteDuration,
     */
      LookupCache createCache(LookupCacheMetricGroupLong metricGroup);
}

DefaultLookupCacheFactory

In order to simplify the usage of developer, we provide a default factory for building a default cache. 

Code Block
languagejava
titleDefaultLookupCacheFactory
/** Factory for creating instance of {@link DefaultLookupCache}. */
@PublicEvolving
public class DefaultLookupCacheFactory implements LookupCacheFactory {
    private final Duration expireAfterAccessDuration;
    private final Duration expireAfterWriteDuration;
    private final Integer initialCapacity;
    private final Long maximumSize;

    public static DefaultLookupCacheFactory.Builder newBuilder() {
        return new Builder();
    }

    private DefaultLookupCacheFactory(
            Duration expireAfterAccessDuration,
            Duration expireAfterWriteDuration,
            Long maximumSize) maximumSize) {
        this.expireAfterAccessDuration = expireAfterAccessDuration;
        this.expireAfterWriteDuration = expireAfterWriteDuration;
        this.initialCapacity = initialCapacity;
        this.maximumSize = maximumSize;
    }

    @Override
    public LookupCache createCache(LookupCacheMetricGroup metricGroup) {// overide
    
    public static // Create instance of DefaultLookupCache
Builder newBuilder() {
      }

  return  /** Builder of {@link DefaultLookupCacheFactory}. */
    new Builder();
    } 

   public static class Builder {
        private Duration expireAfterAccessDuration;
        private Duration expireAfterWriteDuration;
        private Integer initialCapacity;
        private Long maximumSize;

        public DefaultLookupCacheFactoryDefaultPartialCache.Builder expireAfterAccess(Duration duration) {
            expireAfterAccessDuration = duration;
            return this;
        }

        public DefaultLookupCacheFactoryDefaultPartialCache.Builder expireAfterWrite(Duration duration) {
            expireAfterWriteDuration = duration;
            return this;
        }

        public DefaultLookupCacheFactoryDefaultPartialCache.Builder maximumSize(long maximumSize) {
            this.maximumSize = maximumSize;
            return this;
        }

        public DefaultLookupCacheFactoryDefaultPartialCache build() {
            return new DefaultLookupCacheFactoryDefaultPartialCache(
                    expireAfterAccessDuration,
                    expireAfterWriteDuration,
                    maximumSize);
        }
    }     

}

LookupCacheMetricGroup

An interface defining all cache related metric:

Code Block
languagejava
titleCacheMetricGroupLookupCacheMetricGroup
/**
 * Pre-defined metrics for {@code LookupCache}.
 *
 * <p>Please note that these methods should only be invoked once. Registering a metric with same
 * name for multiple times would lead to an undefined behavior.
 */
@PublicEvolving
public interface LookupCacheMetricGroup extends MetricGroup {
    /** The number of cache hits. */
    void hitCounter(Counter hitCounter);

    /** The number of cache misses. */
    void missCounter(Counter missCounter);

    /** The number of times to load data into cache from external system. */
    void loadCounter(Counter loadCounter);

    /** The number of load failures. */
    void numLoadFailuresCounter(Counter numLoadFailuresCounter);

    /** The time spent for the latest load operation. */
    void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);

    /** The number of records in cache. */
    void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);

    /** The number of bytes used by cache. */
    void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
}

...

Code Block
languagejava
titleLookupFunctionProvider
/**
 * Provider for creating {@link LookupFunction} and {@link LookupCacheFactory} if caching should be
 * enabled for the lookup table.
 */
@PublicEvolving
public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {
    
    /** Creates a builder of {@link LookupFunctionProvider}. */
    static Builder newBuilder() {
        return new Builder();
    }

    /** Creates an {@link LookupFunction} instance. */
    LookupFunction createLookupFunction();

    /**
     * Gets the {@link LookupCacheFactory} for creating lookup cache.
     *
     * <p>This factorycache will be used for creating an instance of cache used during runtime execution for
     * optimizing the access to external lookup table.
     *
     * @return an {@link Optional} of {@link LookupCacheFactoryLookupCache}, or an empty {@link Optional} if
     *     caching shouldn't be applies to the lookup table.
     */
    Optional<LookupCacheFactory>Optional<PartialCache> getCacheFactorygetCache();

    /**
     * Whether the missing key (key fields without any matching value rows) should be stored in the
     * cache.
     *
     * <p>Please note that this option is required if {@link #getCacheFactory()} returns a non-empty
     * instance. If the cache factory is empty, the return value of this function will be ignored.
     *
     * @return true if a null or empty value should be stored in the cache.
     */
    Optional<Boolean> cacheMissingKey();

    /** Builder class for {@link LookupFunctionProvider}. */ be ignored.
    class Builder {
*
     * @return true privateif LookupFunctiona lookupFunction;
null or empty value should be stored in private LookupCacheFactory cacheFactory;the cache.
     */
   private BooleanOptional<Boolean> enableCacheMissingKeycacheMissingKey();

           /** Builder Sets lookup functionclass for {@link LookupFunctionProvider}. */
    class    public Builder withLookupFunction(LookupFunction lookupFunction) {
     {

        this.lookupFunctionprivate =LookupFunction lookupFunction;
        private PartialCache partialCache;
  return this;
     private Boolean  }enableCacheMissingKey;

        /** EnablesSets caching and sets the cache factorylookup function. */
        public Builder withCacheFactorywithLookupFunction(LookupCacheFactoryLookupFunction cacheFactorylookupFunction) {
            this.cacheFactorylookupFunction = cacheFactory;
            return this;
        }

        /**
  lookupFunction;
       * Enables storing missing key in the cache.return this;
         *}

         /** <p>SeeEnables {@link LookupFunctionProvider#cacheMissingKey()} for more details.
         caching and sets the cache factorEnables storing missing key in the cache. */
        public Builder withCache(PartialCache partialCache , Boolean enableCacheMissingKey() {
            this.partialCache = partialCache;
            this.enableCacheMissingKey = true;
            return this;
     return this;
  }

      }

        public LookupFunctionProvider build() {
            // Build LookupFunctionProvider
        }
    }
}

...

Code Block
languagejava
titleRescanRuntimeProviderFullCachingLookupProvider
/**
 * Runtime provider for fully loading and periodically reloading all entries of the lookup table and
 * storing the table locally for lookup.
 *
 * <p>Implementations should provide a {@link ScanTableSource.ScanRuntimeProvider} in order to reuse
 * the ability of scanning for loading all entries from the lookup table.
 */
@PublicEvolving
public interface FullCachingLookupProvider extends LookupTableSource.LookupRuntimeProvider {  
     
     
	/**
     * Creates {@link FullCachingLookupProvider} with provided scan runtime provider and reload
     * time.
     */
    static FullCachingLookupProvider of(
            ScanTableSource.ScanRuntimeProvider scanRuntimeProvider,
 ReloadTime reloadTime) {
        return newReloadStrategy FullCachingLookupProvider(reloadStrategy) {
        return    @Override
            public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider() {
                return scanRuntimeProvider;
            }

new FullCachingLookupProvider() {
            @Override
            public ReloadTimeScanTableSource.ScanRuntimeProvider getReloadTimegetScanRuntimeProvider() {
                return reloadTimescanRuntimeProvider;
            }
        };
    }      

	/**
     * Gets the {@link org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider}@Override
     * for executing the periodically reload.
  public   */
    ScanTableSource.ScanRuntimeProvider getScanRuntimeProviderReloadStrategy getReloadStrategy();  

	/** Gets the time when to reload cache. See {@link ReloadTime}. */
{
                ReloadTimereturn getReloadTime()reloadStrategy;
            }
     /** Defines at what};
 time the cache should}

 be  reloaded.  /**/
    interface ReloadTime* extendsGets Serializablethe {@link org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider}
        /** Gets* for executing the periodically intervalreload.
 between two reload operations. */
    ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();      Duration getReloadInterval();

     /**
     /** Gets the start time of the reload operation in UTC. */
 strategy when to reload cache. See {@link ReloadStrategy} and default implementation
     * {@link FixedDelayReloadStrategy}.
     */
    LocalTimeReloadStrategy getReloadStartTimegetReloadStrategy();

   
     /** CreatesDefines reloadstrategy timewhen withto periodicreload intervalscache. */
    interface ReloadStrategy extends Serializable static ReloadTime withInterval(Duration reloadInterval) {{ 
        
		/**
    return new ReloadTime() {
  * Schedules task that reloads cache. This method is called just once, so  @Overrideimplementation
         * must create the separate thread that publicwill Durationrun getReloadInterval() {
@code reloadTask}          by its specific
         return* reloadInterval;triggers.
         */         
		void  }scheduleReload(Runnable reloadTask);

        /** Stops task that reloads    @Overridecache. */
        void stopReloading();
       public LocalTime getReloadStartTime() {
                    return LocalTime.now(ZoneOffset.UTC);
                }}
}

FixedDelayReloadStrategy

This class is default implementation of FullCachingLookupProvider.ReloadStrategy that schedules reloads with fixed delay, using ScheduledExecutorService#scheduleWithFixedDelay.

Code Block
languagejava
titleFixedDelayReloadStrategy
/**
 * Default {@link FullCachingLookupProvider.ReloadStrategy} that schedules reloads with fixed delay.
 */
public class FixedDelayReloadStrategy implements FullCachingLookupProvider.ReloadStrategy {

    private final Duration reloadInterval;
    @Nullable private final LocalTime     }reloadStartTime;
    private    }

        /**ScheduledExecutorService executorService;

    private FixedDelayReloadStrategy(Duration reloadInterval, @Nullable LocalTime *reloadStartTime) Creates{
 reload time with periodic intervals after initial delaythis.reloadInterval up to {@code= reloadInterval;
        this.reloadStartTime *= reloadStartTime}.;
    }

     */@Override
    public void scheduleReload(Runnable reloadTask) static{
 ReloadTime withIntervalAfterDelay(
      long delay = reloadInterval.toMillis();
       Duration reloadInterval,long LocalTimeinitialDelay reloadStartTime)= {0;
            return new ReloadTime(if (reloadStartTime != null) {
            LocalTime now   @Override= LocalTime.now(ZoneOffset.UTC);
            Duration initialDelayDuration   public= Duration getReloadInterval() {.between(now, reloadStartTime);
            if (initialDelayDuration.isNegative()) {
      return reloadInterval;
         // in case when reloadStartTime less than }

current time, reload will happen next day
          @Override
      initialDelayDuration =     initialDelayDuration.plus(1, ChronoUnit.DAYS);
    public LocalTime getReloadStartTime() {
     }
            initialDelay   return reloadStartTime= initialDelayDuration.toMillis();
        }
        }
executorService = Executors.newSingleThreadScheduledExecutor();
          };executorService.scheduleWithFixedDelay(
        }

        /** CreatesreloadTask, reloadinitialDelay, time daily at specified {@code reloadStartTime}. */delay, TimeUnit.MILLISECONDS);
    }

    @Override
    staticpublic ReloadTimevoid dailyAtSpecifiedTimestopReloading(LocalTime reloadStartTime) {
           if return(executorService new!= ReloadTime(null) {
            executorService.shutdown();
    @Override
    }
    }

    /** Creates reload strategy publicwith Duration getReloadInterval() {periodic intervals. */
    public static FixedDelayReloadStrategy withInterval(Duration reloadInterval) {
        return new  return Duration.ofDays(1FixedDelayReloadStrategy(reloadInterval, null);
    }

    /**
     * Creates reload }

strategy with periodic intervals after initial delay up to {@code
     *  @OverridereloadStartTime}.
     */
    public static FixedDelayReloadStrategy withIntervalAfterDelay(
    public LocalTime getReloadStartTime() {
     Duration reloadInterval, LocalTime reloadStartTime) {
        return   returnnew FixedDelayReloadStrategy(reloadInterval, reloadStartTime);
    }

    /** Creates strategy with daily reload at specified {@code reloadStartTime}. */
    public static FixedDelayReloadStrategy dailyAtSpecifiedTime(LocalTime reloadStartTime) {
   };
     return new  }FixedDelayReloadStrategy(Duration.ofDays(1), reloadStartTime);
    }
 }

TableFunctionProvider / AsyncTableFunctionProvider

...