Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleLookupFunction
/**
 * A wrapper class of {@link TableFunction} for synchronously lookup rows matching the lookup keys
 * from external system.
 *
 * <p>The output type of this table function is fixed as {@link RowData}.
 */
@PublicEvolving
public abstract class LookupFunction extends TableFunction<RowData> {

    /**
     * Synchronously lookup rows matching the lookup keys.
     *
     * @param keyskeyRow - Keys A {@link RowData} that wraps keys to lookup.
     * @return A collections of all matching rows in the lookup table.
     */
    public abstract Collection<RowData> lookup(Object... keysRowData keyRow) throws IOException;

    /** Invoke {@link #lookup} and handle exceptions. */
    public final void eval(Object... keys) {
        try {
            lookup(keys).forEach(this::collect);
        } catch (IOException e) {
            throw new RuntimeException("Failed to lookup values with given key", e);
        }
    }
}

...

Code Block
languagejava
titleAsyncLookupFunction
/**
 * A wrapper class of {@link AsyncTableFunction} for asynchronously lookup rows matching the lookup
 * keys from external system.
 *
 * <p>The output type of this table function is fixed as {@link RowData}.
 */
@PublicEvolving
public abstract class AsyncLookupFunction extends AsyncTableFunction<RowData> {

    /**
     * Asynchronously lookup rows matching the lookup keys.
     *
     * @param keyskeyRow - KeysA {@link RowData} that wraps keys to lookup.
     * @return A collections of all matching rows in the lookup table.
     */
    public abstract CompletableFuture<Collection<RowData>> asyncLookup(Object... keysRowData keyRow);

    /** Invokes {@link #asyncLookup} and chains futures. */
    public final void eval(CompletableFuture<Collection<RowData>> future, Object... keys) {
        asyncLookup(keys)
                .whenCompleteAsync(
                        (result, exception) -> {
                            if (exception != null) {
                                future.completeExceptionally(exception);
                                return;
                            }
                            future.complete(result);
                        });
    }
}

...

Code Block
languagejava
titleLookupCache
/**
 * A semi-persistent mapping from keys to values for storing entries of lookup table.
 *
 * <p>The type of the caching key is a {@link RowData} with lookup key fields packed inside. The
 * type of value is a {@link Collection} of {@link RowData}, which are rows matching lookup key
 * fields.
 *
 * <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either
 * evicted or manually invalidated.
 *
 * <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
 * by multiple concurrent threads.
 */
@PublicEvolving
public interface LookupCache {

    /**
     * Returns the value associated with key in this cache, or null if there is no cached value for
     * key.
     */
    @Nullable
    Collection<RowData> getIfPresent(RowData key);

    /**
     * Associates the specified value rows with the specified key row in the cache. If the cache
     * previously contained value associated with the key, the old value is replaced by the
     * specified value.
      *
     * @return the previous value rows associated with key, or null if there was no mapping for key.
     * @param key - key row with which the specified value is to be associated
     * @param value – value rows to be associated with the specified key
     */
    Collection<RowData> put(RowData key, Collection<RowData> value);

    /**
 Discards any cached value *for Copiesthe allspecified the mappings from the specified map to the cache. The effect of this call is
     * equivalent to that of calling {@code put(k, v)} on this map once for each mapping from key
     * {@code k} to value {@code v} in the specified map. The behavior of this operation is
     * undefined if the specified map is modified while the operation is in progress.
     */
    void putAll(Map<? extends RowData, ? extends Collection<RowData>> m);key. */
    void invalidate(RowData key);

    /** Returns the number of key-value mappings in the cache. */
    long size();
}

LookupCacheFactory

As the cache should be instantiated during runtime execution to avoid serialization / deserialization, a factory is required for creating the cache. 

Code Block
languagejava
titleLookupCacheFactory

/** Factory for creating an instance of {@link LookupCache}. */
@PublicEvolving
public interface LookupCacheFactory extends Serializable {

    /**
  Discards any cached value* forCreate thea specified{@link keyLookupCache}. */
    void invalidate(RowData key);

  *
     /** Discards@param allmetricGroup entries- inThe thelookup cache. */
    void invalidateAll();

 metric group in which the cache register predefined and
     /**  Returns the number of key-value mappings in the cache.custom metrics.
     */
    longLookupCache sizecreateCache(LookupCacheMetricGroup metricGroup);
}

LookupCacheFactory

DefaultLookupCacheFactory

In order to simplify the usage of developer, we provide a default factory for building a default As the cache should be instantiated during runtime execution to avoid serialization / deserialization, a factory is required for creating the cache. 

Code Block
languagejava
titleLookupCacheFactoryDefaultLookupCacheFactory

/** Factory for creating an instance of {@link LookupCacheDefaultLookupCache}. */
@PublicEvolving
public interfaceclass LookupCacheFactoryDefaultLookupCacheFactory extendsimplements SerializableLookupCacheFactory {

    /**
private final    * Create a {@link LookupCache}.
     *Duration expireAfterAccessDuration;
    private final Duration expireAfterWriteDuration;
    private *final @param metricGroup - The lookup cache metric group in which the cache register predefined and
     *     custom metrics.Integer initialCapacity;
    private final Long maximumSize;

    public static DefaultLookupCacheFactory.Builder newBuilder() {
     */
   return LookupCachenew createCacheBuilder(LookupCacheMetricGroup metricGroup);
}

DefaultLookupCacheFactory

In order to simplify the usage of developer, we provide a default factory for building a default cache. 

Code Block
languagejava
titleDefaultLookupCacheFactory
/** Factory for creating instance of {@link DefaultLookupCache}. */
@PublicEvolving
public    classprivate DefaultLookupCacheFactory(
 implements    LookupCacheFactory {
    private final Duration expireAfterAccessDuration;,
    private final        Duration expireAfterWriteDuration;,
         private  final Integer initialCapacity;,
    private final        Long maximumSize;
) {
        // Validation
    public   static DefaultLookupCacheFactorythis.BuilderexpireAfterAccessDuration newBuilder() {= expireAfterAccessDuration;
        returnthis.expireAfterWriteDuration new= Builder()expireAfterWriteDuration;
    }

    this.initialCapacity private= DefaultLookupCacheFactory(initialCapacity;
        this.maximumSize = maximumSize;
  Duration  expireAfterAccessDuration,}

    @Override
    public LookupCache createCache(LookupCacheMetricGroup  Duration expireAfterWriteDuration,metricGroup) {
        // Create instance of Integer initialCapacity,DefaultLookupCache
    }

    /** Builder of {@link Long maximumSize) {
 DefaultLookupCacheFactory}. */
    public static class //Builder Validation{
        this.expireAfterAccessDurationprivate =Duration expireAfterAccessDuration;
        this.expireAfterWriteDurationprivate =Duration expireAfterWriteDuration;
        this.initialCapacityprivate =Integer initialCapacity;
        this.maximumSizeprivate =Long maximumSize;
    }

    @Override
    public LookupCacheDefaultLookupCacheFactory.Builder createCacheexpireAfterAccess(LookupCacheMetricGroupDuration metricGroupduration) {
        //   Create instanceexpireAfterAccessDuration of= DefaultLookupCacheduration;
    }

    /** Builder of {@link DefaultLookupCacheFactory}. */return this;
    public static class Builder {}

        public privateDefaultLookupCacheFactory.Builder expireAfterWrite(Duration expireAfterAccessDuration;duration) {
        private   Duration expireAfterWriteDuration = duration;
        private    Integerreturn initialCapacitythis;
        private Long maximumSize;}

        public DefaultLookupCacheFactory.Builder expireAfterAccessinitialCapacity(Durationint durationinitialCapacity) {
            expireAfterAccessDurationthis.initialCapacity = durationinitialCapacity;
            return this;
        }

        public DefaultLookupCacheFactory.Builder expireAfterWritemaximumSize(Durationlong durationmaximumSize) {
            expireAfterWriteDurationthis.maximumSize = durationmaximumSize;
            return this;
        }

        public DefaultLookupCacheFactory.Builder initialCapacitybuild(int initialCapacity) {
            this.initialCapacity = initialCapacity;
return new DefaultLookupCacheFactory(
                   return this;expireAfterAccessDuration,
        }

        public DefaultLookupCacheFactory.Builder maximumSize(long maximumSize) {expireAfterWriteDuration,
            this.maximumSize   = maximumSize;
    initialCapacity,
        return this;
        }

   maximumSize);
     public DefaultLookupCacheFactory build() {}
            return new DefaultLookupCacheFactory(
                    expireAfterAccessDuration,
                    expireAfterWriteDuration,
                    initialCapacity,
                    maximumSize);
        }
    }
}

LookupCacheMetricGroup

An interface defining all cache related metric:

}
}

LookupCacheMetricGroup

An interface defining all cache related metric:

Code Block
languagejava
titleCacheMetricGroup
/**
 * Pre-defined metrics for {@code LookupCache}.
 *
 * <p>Please note that these methods should only be invoked once. Registering a metric with same
 * name for multiple times would lead to an undefined behavior.
 */
@PublicEvolving
public interface LookupCacheMetricGroup extends MetricGroup {
    /** The number of cache hits. */
    void hitCounter(Counter hitCounter);

    /** The number of cache misses. */
    void missCounter(Counter missCounter);

    /** The number of times to load data into cache from external system. */
    void loadCounter(Counter loadCounter);

    /** The number of load failures. */
    void numLoadFailuresCounter(Counter numLoadFailuresCounter);
Code Block
languagejava
titleCacheMetricGroup
/** Pre-defined metrics for {@code LookupCache}. */
@PublicEvolving
public interface LookupCacheMetricGroup extends MetricGroup {
    /** The number of cache hits time spent for the latest load operation. */
    void setHitCounterlatestLoadTimeGauge(CounterGauge<Long> hitCounterlatestLoadTimeGauge);

    /** The number of records in cache misses. */
    void setMissCounternumCachedRecordsGauge(CounterGauge<Long> missCounternumCachedRecordsGauge);

    /** The number of times to load data into cache from external systembytes used by cache. */
    void setLoadCounternumCachedBytesGauge(CounterGauge<Long> loadCounternumCachedBytesGauge);

    /** The number of load failures. */
    void setNumLoadFailuresCounter(Counter numLoadFailuresCounter);

    /** The time spent}

LookupFunctionProvider

This is the API between table framework and user's table source. Implementation should define how to create a lookup function and whether to use cache. 

Code Block
languagejava
titleLookupFunctionProvider
/**
 * Provider for creating {@link LookupFunction} and {@link LookupCacheFactory} if caching should be
 * enabled for the latestlookup load operationtable.
 */
@PublicEvolving
public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider void{
 setLatestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);

  
    /** Creates Thea numberbuilder of records in cache. */
    void setNumCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);

{@link LookupFunctionProvider}. */
    /**static TheBuilder number of bytes used by cache. */
newBuilder() {
        return voidnew setNumCachedBytesGaugeBuilder(Gauge<Long> numCachedBytesGauge);
}

LookupFunctionProvider

This is the API between table framework and user's table source. Implementation should define how to create a lookup function and whether to use cache. 

Code Block
languagejava
titleLookupFunctionProvider
/**
 * Provider for creating     }

    /** Creates an {@link LookupFunction} and {@link LookupCacheFactory} if caching should be
 * enabled for the lookup table.
 */
@PublicEvolving
public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {

    /**
     * Creates {@link LookupFunctionProvider} with the given {@link LookupFunction} and disable
     * lookup table caching instance. */
    LookupFunction createLookupFunction();

    /**
     * Gets the {@link LookupCacheFactory} for creating lookup cache.
     *
     * <p>This factory will be used for creating an instance of cache during runtime execution for
     * optimizing the access to external lookup table.
     */
    static LookupFunctionProvider* of(LookupFunction lookupFunction)@return an {
@link Optional} of {@link LookupCacheFactory}, or an empty return{@link new LookupFunctionProvider() {
Optional} if
     *     caching shouldn't be @Override
applies to the lookup table.
     */
   public LookupFunctionOptional<LookupCacheFactory> createLookupFunctiongetCacheFactory() {;

    /**
     * Whether the missing key (key fields returnwithout lookupFunction;
any matching value rows) should be stored in     }the

     * cache.
      @Override*
     * <p>Please note that this option is required publicif Optional<LookupCacheFactory>{@link getCacheFactory#getCacheFactory() {} returns a non-empty
     * instance. If the cache factory is empty, the return value return Optional.empty();
    of this function will be ignored.
     *
   }

  * @return true if a null or empty value should @Override
be stored in the cache.
     */
   public Optional<Boolean> cacheMissingKey();

    /** Builder class for {@link LookupFunctionProvider}. */
     class Builder {

        private return Optional.empty()LookupFunction lookupFunction;
        private LookupCacheFactory cacheFactory;
  }
      private Boolean }enableCacheMissingKey;

    }

    /**
 Sets lookup function. */
  * Creates {@link LookupFunctionProvider} with the givenpublic {@linkBuilder withLookupFunction(LookupFunction} andlookupFunction) enable
     * caching with specified {@link LookupCacheFactory}.
{
          */
    static LookupFunctionProvider of(this.lookupFunction = lookupFunction;
            LookupFunctionreturn lookupFunction,this;
        }

    LookupCacheFactory cacheFactory,
   /** Enables caching and sets the cache   boolean cacheMissingKey) {factory. */
        returnpublic newBuilder LookupFunctionProviderwithCacheFactory(LookupCacheFactory cacheFactory) {
            @Override
this.cacheFactory = cacheFactory;
          public LookupFunction createLookupFunction() {return this;
        }

        return lookupFunction;
/**
         * Enables storing missing }

key in the cache.
         @Override*
         * <p>See  public Optional<LookupCacheFactory> getCacheFactory{@link LookupFunctionProvider#cacheMissingKey()} {
for more details.
              return Optional.of(cacheFactory);*/
        public Builder enableCacheMissingKey()  }{

            @Override
this.enableCacheMissingKey = true;
          public Optional<Boolean> cacheMissingKey() {return this;
        }

        return Optional.of(cacheMissingKey);
public LookupFunctionProvider build() {
            // Build }LookupFunctionProvider
        };
    }
}

AsyncLookupFunctionProvider

Code Block
languagejava
titleAsyncLookupFunctionProvider
@PublicEvolving
public interface AsyncLookupFunctionProvider  /** Creates an {@link LookupFunction} instance. */
    LookupFunction createLookupFunction();

extends LookupTableSource.LookupRuntimeProvider {

    /**
     * Gets Creates {@link AsyncLookupFunctionProvider} with the given {@link LookupCacheFactoryAsyncLookupFunction} for creatingand
     * disable lookup cachetable caching.
     */
    static *AsyncLookupFunctionProvider <p>This factory will be used for creating an instance of cache during runtime execution forof(AsyncLookupFunction asyncLookupFunction) {
        return new AsyncLookupFunctionProvider() {
     * optimizing the access to external lookup table.@Override
     *
     * @return anpublic {@link Optional} of {@link LookupCacheFactory}, or an empty {@link Optional} ifAsyncLookupFunction createAsyncLookupFunction() {
     *     caching shouldn't be applies to the lookupreturn table.asyncLookupFunction;
     */
      Optional<LookupCacheFactory> getCacheFactory();}

     /**
      * Whether@Override
 the missing key (key fields without any matching value rows) should bepublic storedOptional<LookupCacheFactory> ingetCacheFactory() the{
     * cache.
     *
     * <p>Please note that this option is required if {@link #getCacheFactory()} returns a non-emptyreturn Optional.empty();
            }

     *  instance. If the cache factory is@Override
 empty, the return value of this function will be ignored.
  public Optional<Boolean> cacheMissingKey() *{
     * @return true if a null or empty value should be stored in thereturn cacheOptional.empty();
     */
      Optional<Boolean> cacheMissingKey();
}

AsyncLookupFunctionProvider

Code Block
languagejava
titleAsyncLookupFunctionProvider
@PublicEvolving
public interface AsyncLookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider { }
        };
    }

    /**
     * Creates {@link AsyncLookupFunctionProvider} with the given {@link AsyncLookupFunction} and
     * enable caching disablewith lookupspecified table{@link cachingLookupCacheFactory}.
     */
    static AsyncLookupFunctionProvider of(
            AsyncLookupFunction asyncLookupFunction asyncLookupFunction,
            LookupCacheFactory cacheBuilder,
            boolean cacheMissingKey) {
        return new AsyncLookupFunctionProvider() {
            @Override
            public AsyncLookupFunction createAsyncLookupFunction() {
                return asyncLookupFunction;
            }

            @Override
            public Optional<LookupCacheFactory> getCacheFactory() {
                return Optional.emptyof(cacheBuilder);
            }

            @Override
            public Optional<Boolean> cacheMissingKey() {
                return Optional.emptyof(cacheMissingKey);
            }
        };
    }

    /**
 Creates an {@link AsyncLookupFunction} instance. */
 Creates {@link AsyncLookupFunctionProvider} with the given {@link AsyncLookupFunction} andAsyncLookupFunction createAsyncLookupFunction();

    /**
     * enableGets cachingthe with specified {@link LookupCacheFactory} for creating lookup cache.
     */
    static * AsyncLookupFunctionProvider<p>This of(
factory will be used for creating an instance of cache during runtime AsyncLookupFunctionexecution asyncLookupFunction,for
     * optimizing the access to external lookup LookupCacheFactory cacheBuilder,
table.
     *
     * @return an {@link booleanOptional} cacheMissingKey)of {
@link LookupCacheFactory}, or an empty {@link Optional} if
  return new AsyncLookupFunctionProvider() {
*     caching shouldn't be applies to the lookup @Overridetable.
     */
    Optional<LookupCacheFactory> getCacheFactory();

  public AsyncLookupFunction createAsyncLookupFunction() {
                return asyncLookupFunction;  /**
     * Whether the missing key (key fields without any matching value rows) should be stored in the
     * cache.
     *
 }

    * <p>Please note that this option is required @Override
if {@link #getCacheFactory()} returns a non-empty
     * instance. publicIf Optional<LookupCacheFactory>the getCacheFactory() {
           cache factory is empty, the return value of this function will be ignored.
     return Optional.of(cacheBuilder);*
     * @return true if a null or }

empty value should be stored in the cache.
     @Override*/
    Optional<Boolean> cacheMissingKey();
    
    /** publicBuilder Optional<Boolean>class cacheMissingKey()for {@link AsyncLookupFunctionProvider}. */
    class Builder {

        private  return Optional.of(cacheMissingKey)AsyncLookupFunction asyncLookupFunction;
        private LookupCacheFactory cacheFactory;
  }
      private Boolean }enableCacheMissingKey;

    }

    /** CreatesSets an {@link AsyncLookupFunction} instancelookup function. */
     AsyncLookupFunction createAsyncLookupFunction();

  public  /**
Builder withAsyncLookupFunction(AsyncLookupFunction asyncLookupFunction) {
      * Gets the {@link LookupCacheFactory} for creatingthis.asyncLookupFunction lookup= cache.asyncLookupFunction;
     *
     * <p>This factory willreturn bethis;
 used for creating an instance of cache during}

 runtime execution for
     /** optimizingEnables thecaching accessand tosets externalthe lookupcache tablefactory. */
     *
   public Builder * @return an {@link Optional} of {@link LookupCacheFactory}, or an empty {@link Optional} if
     *withCacheFactory(LookupCacheFactory cacheFactory) {
            this.cacheFactory = cacheFactory;
      caching shouldn't be applies to the lookupreturn table.
this;
        */}

    Optional<LookupCacheFactory> getCacheFactory();

    /**
         * WhetherEnables thestoring missing key (keyin fieldsthe withoutcache.
 any matching value rows) should be stored in the*
         * cache.
 <p>See {@link AsyncLookupFunctionProvider#cacheMissingKey()} for more details.
         */
        public  * <p>Please note that this option is required if {@link #getCacheFactory()} returns a non-emptyBuilder enableCacheMissingKey() {
            this.enableCacheMissingKey = true;
     * instance. If the cache factory is empty, the return value of this function will be ignored.return this;
        }

     *
   public  * @return true if a null or empty value should be stored in the cache.AsyncLookupFunctionProvider build() {
            // Build AsyncLookupFunctionProvider
     */
    }
   Optional<Boolean> cacheMissingKey();}
}

RescanRuntimeProvider

This interface is for supporting all cache strategy. It reuses ScanRuntimeProvider and defines interval of re-scan. 

Code Block
languagejava
titleRescanRuntimeProvider
/**
 * Runtime provider for periodically re-scanning all entries of the lookup table and storing the
 * table locally for lookup.
 */
@PublicEvolving
public interface RescanRuntimeProvider extends LookupTableSource.LookupRuntimeProvider {

    /**
     * Gets the {@link org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider}
     * for executing the periodically re-scan.
     */
    ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();

    /** Gets the interval between two re-scans. */
    Duration getRescanInterval();
}

TableFunctionProvider / AsyncTableFunctionProvider

We'd like to deprecate these two interfaces and let developers to switch to the new LookupFunctionProvider / AsyncLookupFunctionProvider instead.

Cache Metrics

It is important to mention that a cache implementation does not have to report all the defined metrics. But if a cache reports a metric of the same semantic defined below, the implementation should follow the convention.

...

The metric group for the cache would be a sub-group of the OperatorMetricGroup where the table function belongs to.

Future Works

In order to reduce network I/O with external systems and the usage of cache further, some optimizations implemented on scan source could be also applied on the lookup table, such as projection and filter pushdown. These features will be introduced separately in another FLIP. 

Compatibility, Deprecation, and Migration Plan

...