Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

If the size of lookup table is relatively small to fit into the memory, and the lookup table doesn't change frequently, it'll be more efficient to load all entries of the lookup table into the cache to reduce network I/O, and refresh the table periodically. We'd like to name this use case as "all cache". Logically the reload operation is a kind of scan, so we'd like to reuse the ScanRuntimeProvider so that developers could reuse the scanning logic implemented in Source / SourceFunction / InputFormat. Considering the complexity of Source API, we'd like to support SourceFunction and InputFormat API first. Supporting Source API might require new topology and will be discussed later in another FLIP.

We propose to introduce a new interface RescanRuntimeProvider in order to reuse the ability of scanning.

Public Interfaces

Lookup Functions

...

Code Block
languagejava
titleAsyncLookupFunctionProvider
@PublicEvolving
public interface AsyncLookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {

    /**
     * Creates {@link AsyncLookupFunctionProvider} with the given {@link AsyncLookupFunction} and
     * disable lookup table caching.
     */
    static AsyncLookupFunctionProvider of(AsyncLookupFunction asyncLookupFunction) {
        return new AsyncLookupFunctionProvider() {
            @Override
            public AsyncLookupFunction createAsyncLookupFunction() {
                return asyncLookupFunction;
            }

            @Override
            public Optional<LookupCacheFactory> getCacheFactory() {
                return Optional.empty();
            }

            @Override
            public Optional<Boolean> cacheMissingKey() {
                return Optional.empty();
            }
        };
    }

    /**
     * Creates {@link AsyncLookupFunctionProvider} with the given {@link AsyncLookupFunction} and
     * enable caching with specified {@link LookupCacheFactory}.
     */
    static AsyncLookupFunctionProvider of(
            AsyncLookupFunction asyncLookupFunction,
            LookupCacheFactory cacheBuilder,
            boolean cacheMissingKey) {
        return new AsyncLookupFunctionProvider() {
            @Override
            public AsyncLookupFunction createAsyncLookupFunction() {
                return asyncLookupFunction;
            }

            @Override
            public Optional<LookupCacheFactory> getCacheFactory() {
                return Optional.of(cacheBuilder);
            }

            @Override
            public Optional<Boolean> cacheMissingKey() {
                return Optional.of(cacheMissingKey);
            }
        };
    }

    /** Creates an {@link AsyncLookupFunction} instance. */
    AsyncLookupFunction createAsyncLookupFunction();

    /**
     * Gets the {@link LookupCacheFactory} for creating lookup cache.
     *
     * <p>This factory will be used for creating an instance of cache during runtime execution for
     * optimizing the access to external lookup table.
     *
     * @return an {@link Optional} of {@link LookupCacheFactory}, or an empty {@link Optional} if
     *     caching shouldn't be applies to the lookup table.
     */
    Optional<LookupCacheFactory> getCacheFactory();

    /**
     * Whether the missing key (key fields without any matching value rows) should be stored in the
     * cache.
     *
     * <p>Please note that this option is required if {@link #getCacheFactory()} returns a non-empty
     * instance. If the cache factory is empty, the return value of this function will be ignored.
     *
     * @return true if a null or empty value should be stored in the cache.
     */
    Optional<Boolean> cacheMissingKey();
}

RescanRuntimeProvider

This interface is for supporting all cache strategy. It reuses ScanRuntimeProvider and defines interval of re-scan. 

Code Block
languagejava
titleRescanRuntimeProvider
/**
 * Runtime provider for periodically re-scanning all entries of the lookup table and storing the
 * table locally for lookup.
 */
@PublicEvolving
public interface RescanRuntimeProvider extends LookupTableSource.LookupRuntimeProvider {

    /**
     * Gets the {@link org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider}
     * for executing the periodically re-scan.
     */
    ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();

    /** Gets the interval between two re-scans. */
    Duration getRescanInterval();
}


Cache Metrics

It is important to mention that a cache implementation does not have to report all the defined metrics. But if a cache reports a metric of the same semantic defined below, the implementation should follow the convention.

...