...
If the size of lookup table is relatively small to fit into the memory, and the lookup table doesn't change frequently, it'll be more efficient to load all entries of the lookup table into the cache to reduce network I/O, and refresh the table periodically or at specified time of day. We'd like to name this use case as "full cache". Logically the reload operation is a kind of scan, so we'd like to reuse the ScanRuntimeProvider so that developers could reuse the scanning logic implemented in Source / SourceFunction / InputFormat. Considering the complexity of Source API, we'd like to support SourceFunction and InputFormat API first. Supporting Source API might require new topology and will be discussed later in another FLIP.
...
This interface is for supporting full cache strategy. It reuses ScanRuntimeProvider and defines interval of reload time.
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Runtime provider for fully loading and periodically reloading all entries of the lookup table and * storing the table locally for lookup. * * <p>Implementations should provide a {@link ScanTableSource.ScanRuntimeProvider} in order to reuse * the ability of scanning for loading all entries from the lookup table. */ @PublicEvolving public interface FullCachingLookupProvider extends LookupTableSource.LookupRuntimeProvider { /** * Creates {@link FullCachingLookupProvider} with provided scan runtime provider and rescanreload * intervaltime. */ static FullCachingLookupProvider of( ScanTableSource.ScanRuntimeProvider scanRuntimeProvider, DurationReloadTime reloadIntervalreloadTime) { return new FullCachingLookupProvider() { @Override public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider() { return scanRuntimeProvider; } @Override public DurationReloadTime getReloadIntervalgetReloadTime() { return reloadIntervalreloadTime; } }; } /** * Gets the {@link org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider} * for executing the periodically reload. */ ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(); /** Gets the intervaltime betweenwhen twoto reload operations cache. See {@link ReloadTime}. */ DurationReloadTime getReloadIntervalgetReloadTime(); } |
TableFunctionProvider / AsyncTableFunctionProvider
/** Defines at what time the cache should be reloaded. */
interface ReloadTime extends Serializable {
/** Gets the interval between two reload operations. */
Duration getReloadInterval();
/** Gets the start time of the reload operation in UTC. */
LocalTime getReloadStartTime();
/** Creates reload time with periodic intervals. */
static ReloadTime withInterval(Duration reloadInterval) {
return new ReloadTime() {
@Override
public Duration getReloadInterval() {
return reloadInterval;
}
@Override
public LocalTime getReloadStartTime() {
return LocalTime.now(ZoneOffset.UTC);
}
};
}
/**
* Creates reload time with periodic intervals after initial delay up to {@code
* reloadStartTime}.
*/
static ReloadTime withIntervalAfterDelay(
Duration reloadInterval, LocalTime reloadStartTime) {
return new ReloadTime() {
@Override
public Duration getReloadInterval() {
return reloadInterval;
}
@Override
public LocalTime getReloadStartTime() {
return reloadStartTime;
}
};
}
/** Creates reload time daily at specified {@code reloadStartTime}. */
static ReloadTime dailyAtSpecifiedTime(LocalTime reloadStartTime) {
return new ReloadTime() {
@Override
public Duration getReloadInterval() {
return Duration.ofDays(1);
}
@Override
public LocalTime getReloadStartTime() {
return reloadStartTime;
}
};
}
}
} |
TableFunctionProvider / AsyncTableFunctionProvider
We'd We'd like to deprecate these two interfaces and let developers to switch to the new LookupFunctionProvider / AsyncLookupFunctionProvider / FullCachingLookupProvider instead.
...
In order to unify the usage of caching across all connectors, we'd like to introduce some common table options, which are defined under class LookupOptions
. Note that these options are not required to implement by all connectors.
Option | Type | Descriptions |
---|---|---|
lookup.cache | Enum of NONE, PARTIAL and FULL | The caching strategy for this lookup table. NONE: Do not use cache Partial: Use partial caching mode FULL: Use full caching mode |
lookup.async | Boolean | Whether to use asynchronous mode for the lookup table |
lookup.max-retries | Integer | The maximum allowed retries if a lookup operation fails |
lookup.partial-cache.expire-after-access | Duration | Duration to expire an entry in the cache after accessing |
lookup.partial-cache.expire-after-write | Duration | Duration to expire an entry in the cache after writing |
lookup.partial-cache.cache-missing-key | Boolean | Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table |
lookup.partial-cache.max-rows | Long | The maximum number of rows to store in the cache |
lookup.full-cache.reload-interval | Duration | Interval of reloading all entries from the lookup table into cache |
lookup.full-cache.reload-start-time | String | The start time of the cache reload operation in UTC |
Cache Metrics
It is important to mention that a cache implementation does not have to report all the defined metrics. But if a cache reports a metric of the same semantic defined below, the implementation should follow the convention.
Name | Type | Unit | Description |
---|---|---|---|
numCachedRecord | Gauge | Records | The number of records in cache. |
numCachedBytes | Gauge | Bytes | The number of bytes used by cache. |
hitCount | Counter | The number of cache hits | |
missCount | Counter | The number of cache misses, which might leads to loading operations | |
loadCount | Counter | The number of times to load data into cache from external system. For partial cache the load count should be equal to miss count, but for all cache this would be different. | |
numLoadFailure | Counter | The number of load failures | |
latestLoadTime | Gauge | ms | The time spent for the latest load operation |
Here we just define fundamental metrics and let the external metric system make the aggregation to get more descriptive values such as hitRate = hitCount / (hitCount + missCount).
...