Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

If the size of lookup table is relatively small to fit into the memory, and the lookup table doesn't change frequently, it'll be more efficient to load all entries of the lookup table into the cache to reduce network I/O, and refresh the table periodically or at specified time of day. We'd like to name this use case as "full cache". Logically the reload operation is a kind of scan, so we'd like to reuse the ScanRuntimeProvider so that developers could reuse the scanning logic implemented in Source / SourceFunction / InputFormat. Considering the complexity of Source API, we'd like to support SourceFunction and InputFormat API first. Supporting Source API might require new topology and will be discussed later in another FLIP.

...

This interface is for supporting full cache strategy. It reuses ScanRuntimeProvider and defines interval of reload time

Code Block
languagejava
titleRescanRuntimeProvider
/**
 * Runtime provider for fully loading and periodically reloading all entries of the lookup table and
 * storing the table locally for lookup.
 *
 * <p>Implementations should provide a {@link ScanTableSource.ScanRuntimeProvider} in order to reuse
 * the ability of scanning for loading all entries from the lookup table.
 */
@PublicEvolving
public interface FullCachingLookupProvider extends LookupTableSource.LookupRuntimeProvider {

      
	/**
     * Creates {@link FullCachingLookupProvider} with provided scan runtime provider and rescanreload
     * intervaltime.
     */
    static FullCachingLookupProvider of(
            ScanTableSource.ScanRuntimeProvider scanRuntimeProvider, DurationReloadTime reloadIntervalreloadTime) {
        return new FullCachingLookupProvider() {
            @Override
            public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider() {
                return scanRuntimeProvider;
            }

            @Override
            public DurationReloadTime getReloadIntervalgetReloadTime() {
                return reloadIntervalreloadTime;
            }
        };
    }

       


	/**
     * Gets the {@link org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider}
     * for executing the periodically reload.
     */
    ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();

      
	/** Gets the intervaltime betweenwhen twoto reload operations cache. See {@link ReloadTime}. */
    DurationReloadTime getReloadIntervalgetReloadTime();
}

TableFunctionProvider / AsyncTableFunctionProvider



    /** Defines at what time the cache should be reloaded. */
    interface ReloadTime extends Serializable {
        /** Gets the interval between two reload operations. */
        Duration getReloadInterval();

        /** Gets the start time of the reload operation in UTC. */
        LocalTime getReloadStartTime();

        /** Creates reload time with periodic intervals. */
        static ReloadTime withInterval(Duration reloadInterval) {
            return new ReloadTime() {
                @Override
                public Duration getReloadInterval() {
                    return reloadInterval;
                }

                @Override
                public LocalTime getReloadStartTime() {
                    return LocalTime.now(ZoneOffset.UTC);
                }
            };
        }

        /**
         * Creates reload time with periodic intervals after initial delay up to {@code
         * reloadStartTime}.
         */
        static ReloadTime withIntervalAfterDelay(
                Duration reloadInterval, LocalTime reloadStartTime) {
            return new ReloadTime() {
                @Override
                public Duration getReloadInterval() {
                    return reloadInterval;
                }

                @Override
                public LocalTime getReloadStartTime() {
                    return reloadStartTime;
                }
            };
        }

        /** Creates reload time daily at specified {@code reloadStartTime}. */
        static ReloadTime dailyAtSpecifiedTime(LocalTime reloadStartTime) {
            return new ReloadTime() {
                @Override
                public Duration getReloadInterval() {
                    return Duration.ofDays(1);
                }

                @Override
                public LocalTime getReloadStartTime() {
                    return reloadStartTime;
                }
            };
        }
    }
 }

TableFunctionProvider / AsyncTableFunctionProvider

We'd We'd like to deprecate these two interfaces and let developers to switch to the new LookupFunctionProvider / AsyncLookupFunctionProvider / FullCachingLookupProvider instead.

...

In order to unify the usage of caching across all connectors, we'd like to introduce some common table options, which are defined under class LookupOptions. Note that these options are not required to implement by all connectors. 

OptionTypeDescriptions
lookup.cacheEnum of NONE, PARTIAL and FULL

The caching strategy for this lookup table.

NONE: Do not use cache

Partial: Use partial caching mode

FULL: Use full caching mode

lookup.asyncBooleanWhether to use asynchronous mode for the lookup table
lookup.max-retriesIntegerThe maximum allowed retries if a lookup operation fails
lookup.partial-cache.expire-after-accessDurationDuration to expire an entry in the cache after accessing
lookup.partial-cache.expire-after-writeDurationDuration to expire an entry in the cache after writing
lookup.partial-cache.cache-missing-keyBooleanWhether to store an empty value into the cache if the lookup key doesn't match any rows in the table
lookup.partial-cache.max-rowsLongThe maximum number of rows to store in the cache
lookup.full-cache.reload-intervalDurationInterval of reloading all entries from the lookup table into cache
lookup.full-cache.reload-start-timeStringThe start time of the cache reload operation in UTC

Cache Metrics

It is important to mention that a cache implementation does not have to report all the defined metrics. But if a cache reports a metric of the same semantic defined below, the implementation should follow the convention.

NameTypeUnitDescription
numCachedRecordGaugeRecordsThe number of records in cache.
numCachedBytesGaugeBytesThe number of bytes used by cache.
hitCountCounter
The number of cache hits
missCountCounter
The number of cache misses, which might leads to loading operations
loadCountCounter

The number of times to load data into cache from external system.

For partial cache the load count should be equal to miss count, but for all cache this would be different.

numLoadFailureCounter
The number of load failures
latestLoadTimeGaugemsThe time spent for the latest load operation

Here we just define fundamental metrics and let the external metric system make the aggregation to get more descriptive values such as hitRate = hitCount / (hitCount + missCount).

...