...
Code Block |
---|
language | java |
---|
title | LookupFunctionLookupFunctionProvider |
---|
|
/** A provider for creating {@link LookupFunction}. */
@PublicEvolving
public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {
static LookupFunctionProvider of(LookupFunction lookupFunction) {
return () -> lookupFunction;
}
LookupFunction createLookupFunction();
} |
...
Code Block |
---|
language | java |
---|
title | LookupFunctionAsyncLookupFunctionProvider |
---|
|
/** A provider for creating {@link AsyncLookupFunction}. */
@PublicEvolving
public interface AsyncLookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {
static AsyncLookupFunctionProvider of(AsyncLookupFunction asyncLookupFunction) {
return () -> asyncLookupFunction;
}
AsyncLookupFunction createAsyncLookupFunction();
} |
...
Code Block |
---|
language | java |
---|
title | PartialCacheLookupCache |
---|
|
/**
* A semi-persistent mapping from keys to values for storing entries of lookup table.
*
* <p>The type of the caching key is a {@link RowData} with lookup key fields packed inside. The
* type of value is a {@link Collection} of {@link RowData}, which are rows matching lookup key
* fields.
*
* <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either
* evicted or manually invalidated.
*
* <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
* by multiple concurrent threads.
*/
@PublicEvolving
public interface LookupCache extends AutoClosable {
/**
* Initialize the cache.
*
* @param metricGroup the metric group to register cache related metrics.
*/
void open(CacheMetricGroup metricGroup);
/**
* Returns the value associated with key in this cache, or null if there is no cached value for
* key.
*/
@Nullable
Collection<RowData> getIfPresent(RowData key);
/**
* Associates the specified value rows with the specified key row in the cache. If the cache
* previously contained value associated with the key, the old value is replaced by the
* specified value.
*
* @return the previous value rows associated with key, or null if there was no mapping for key.
* @param key - key row with which the specified value is to be associated
* @param value – value rows to be associated with the specified key
*/
Collection<RowData> put(RowData key, Collection<RowData> value);
/** Discards any cached value for the specified key. */
void invalidate(RowData key);
/** Returns the number of key-value mappings in the cache. */
long size();
} |
...
Code Block |
---|
language | java |
---|
title | DefaultPartialCacheDefaultLookupCache |
---|
|
/** Default implementation of {@link LookupCache}. */
@PublicEvolving
public class DefaultLookupCache implements LookupCache {
private final Duration expireAfterAccessDuration;
private final Duration expireAfterWriteDuration;
private final Long maximumSize;
private final boolean cacheMissingKey;
private DefaultLookupCache(
Duration expireAfterAccessDuration,
Duration expireAfterWriteDuration,
Long maximumSize,
boolean cacheMissingKey) {
this.expireAfterAccessDuration = expireAfterAccessDuration;
this.expireAfterWriteDuration = expireAfterWriteDuration;
this.initialCapacity = initialCapacity;
this.maximumSize = maximumSize;
this.cacheMissingKey = cacheMissingKey;
}
public static Builder newBuilder() {
return new Builder();
}
public static class Builder {
private Duration expireAfterAccessDuration;
private Duration expireAfterWriteDuration;
private Long maximumSize;
private Boolean cacheMissingKey;
public Builder expireAfterAccess(Duration duration) {
expireAfterAccessDuration = duration;
return this;
}
public Builder expireAfterWrite(Duration duration) {
expireAfterWriteDuration = duration;
return this;
}
public Builder maximumSize(long maximumSize) {
this.maximumSize = maximumSize;
return this;
}
public Builder cacheMissingKey(boolean cacheMissingKey) {
this.cacheMissingKey = cacheMissingKey;
return this;
}
public DefaultLookupCache build() {
return new DefaultLookupCache(
expireAfterAccessDuration,
expireAfterWriteDuration,
maximumSize,
cacheMissingKey);
}
}
} |
...
Code Block |
---|
language | java |
---|
title | LookupCacheMetricGroupCacheMetricGroup |
---|
|
/**
* Pre-defined metrics for cache.
*
* <p>Please note that these methods should only be invoked once. Registering a metric with same
* name for multiple times would lead to an undefined behavior.
*/
@PublicEvolving
public interface CacheMetricGroup extends MetricGroup {
/** The number of cache hits. */
void hitCounter(Counter hitCounter);
/** The number of cache misses. */
void missCounter(Counter missCounter);
/** The number of times to load data into cache from external system. */
void loadCounter(Counter loadCounter);
/** The number of load failures. */
void numLoadFailuresCounter(Counter numLoadFailuresCounter);
/** The time spent for the latest load operation. */
void latestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);
/** The number of records in cache. */
void numCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);
/** The number of bytes used by cache. */
void numCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
} |
...
Code Block |
---|
language | java |
---|
title | LookupFunctionProviderPartialCachingLookupProvider |
---|
|
/**
* Provider for creating {@link LookupFunction} and {@link LookupCache} for storing lookup entries.
*/
@PublicEvolving
public interface PartialCachingLookupProvider extends LookupFunctionProvider {
/**
* Build a {@link PartialCachingLookupProvider} from the specified {@link LookupFunction} and
* {@link LookupCache}.
*/
static PartialCachingLookupProvider of(LookupFunction lookupFunction, LookupCache cache) {
return new PartialCachingLookupProvider() {
@Override
public LookupCache getCache() {
return cache;
}
@Override
public LookupFunction createLookupFunction() {
return lookupFunction;
}
};
}
/** Get a new instance of {@link LookupCache}. */
LookupCache getCache();
} |
...
Code Block |
---|
language | java |
---|
title | LookupFunctionProviderPartialCachingAsyncLookupProvider |
---|
|
/**
* Provider for creating {@link AsyncLookupFunction} and {@link LookupCache} for storing lookup entries.
*/
@PublicEvolving
public interface PartialCachingAsyncLookupProvider extends AsyncLookupFunctionProvider {
/**
* Build a {@link PartialCachingLookupProvider} from the specified {@link AsyncLookupFunction} and
* {@link LookupCache}.
*/
static PartialCachingLookupProvider of(AsyncLookupFunction asyncLookupFunction, LookupCache cache) {
return new PartialCachingAsyncLookupProvider() {
@Override
public LookupCache getCache() {
return cache;
}
@Override
public AsyncLookupFunction createAsyncLookupFunction() {
return asyncLookupFunction;
}
};
}
/** Get a new instance of {@link LookupCache}. */
LookupCache getCache();
} |
...
Code Block |
---|
language | java |
---|
title | FullCachingReloadTriggerCacheReloadTrigger |
---|
|
/** Customized trigger for reloading all lookup table entries in full caching mode. */
@PublicEvolving
public interface CachingReloadTrigger extends AutoCloseable, Serializable {
/** Open the trigger. */
void open(Context context) throws Exception;
/**
* Context of {@link CacheReloadTrigger} for getting information about times and
* triggering reload.
*/
interface Context {
/** Get current processing time. */
long currentProcessingTime();
/** Get current watermark on the main stream. */
long currentWatermark();
/** Trigger a reload operation on the full cache. */
CompletableFuture<Void> triggerReload();
}
}
|
...
An implementation of FullCachingReloadTrigger that triggers reload with a specified interval.
Code Block |
---|
language | java |
---|
title | PeriodicCacheReloadTrigger |
---|
|
/** A trigger that reloads all entries periodically with specified interval or delay. */
public class PeriodicCacheReloadTrigger implements CacheReloadTrigger {
private final Duration reloadInterval;
private final ScheduleMode scheduleMode;
public PeriodicCacheReloadTrigger(Duration reloadInterval, ScheduleMode scheduleMode) {
this.reloadInterval = reloadInterval;
this.scheduleMode = scheduleMode;
}
@Override
public void open(CacheReloadTrigger.Context context) {
// Register periodic reload task
}
@Override
public void close() throws Exception {
// Dispose resources
}
public enum ScheduleMode {
FIXED_DELAY,
FIXED_RATE
}
} |
TimedCacheReloadTrigger
Code Block |
---|
language | java |
---|
title | TimedCacheReloadTrigger |
---|
|
/** A trigger that reloads at a specific local time and repeat for the given interval in days. */
public class TimedCacheReloadTrigger implements CacheReloadTrigger {
private final LocalTime reloadTime;
private final int reloadIntervalInDays;
public TimedCacheReloadTrigger(LocalTime reloadTime, int reloadIntervalInDays) {
this.reloadTime = reloadTime;
this.reloadIntervalInDays = reloadIntervalInDays;
}
@Override
public void open(Context context) {
// Register periodic reload task
}
@Override
public void close() throws Exception {
// Dispose resources
}
} |
...