You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Status

Current stateUnder discussion

Discussion thread: https://lists.apache.org/thread/1vokqdnnt01yycl7y1p74g556cc8yvtq

JIRA: TBD

Released: Not released yet

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

This FLIP is a joint work of Yuan Zhu (zstraw@163.com) and Qingsheng Ren .

Motivation

As a widely-used feature in Flink SQL jobs, the performance of lookup table source is essential not only for users but also source developers for tuning their implementations. Most lookup table sources use cache to achieve better performance, but there are some features missing in the current design of cache:

  • Missing cache related metrics, which is the key to debug and optimize SQL tasks
  • Duplicated implementations. Currently every lookup source needs to implement or use its own cache.
  • Inconsistent. Table options related to caching are defined differently in sources

In order to address the issues above, we propose here to define a unified abstraction for lookup source cache and its related metrics.

Proposed Changes

We propose a new public interface for cache and a set of conventional / standard metrics. Most of designs are inspired by Google GuavaCache. Also we'd like to introduce new table options for configuring lookup table caching via Table/SQL API.

Cache Metrics

It is important to mention that a cache implementation does not have to report all the defined metrics. But if a cache reports a metric of the same semantic defined below, the implementation should follow the convention.

NameTypeUnitDescription
numCachedRecordGaugeRecordsThe number of records in cache.
numCachedBytesGaugeBytesThe number of bytes used by cache.
hitCountCounter
The number of cache hits
missCountCounter
The number of cache misses, which might leads to loading operations
loadCountCounter

The number of times to load data into cache from external system.

For LRU cache the load count should be equal to miss count, but for all cache this would be different.

numLoadFailureCounter
The number of load failures
latestLoadTimeGaugemsThe time spent for the latest load operation

Here we just define fundamental metrics and let the external metric system make the aggregation to get more descriptive values such as hitRate = hitCount / (hitCount + missCount).

Scope

The metric group for the cache would be a sub-group of the OperatorMetricGroup where the table function belongs to.

Public Interfaces

We propose to add new interfaces that defines abstraction of cache and related helper classes to build TableFunction with cache easily:

  • Cache, a base interface for all caches
  • CacheMetricGroup, a interface defining cache related metrics
  • CacheConfig, a wrapper class containing configurations of the cache
  • CachingTableFunction, a helper TableFunction class with cache
  • AllCachingTableFunction, a helper TableFunction class for lookup sources that loads all entries into the cache on start and reload at fixed rate

Cache

A base interface for all caches:

Cache
/**
 * A semi-persistent mapping from keys to values. Cache entries are manually added using {@link
 * #put(Object, Object)}, and are stored in the cache until either evicted or manually invalidated.
 *
 * <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
 * by multiple concurrent threads.
 *
 * @param <K> key
 * @param <V> value
 */
public interface Cache<K, V> extends AutoCloseable {

    /**
     * Initialize the cache.
     *
     * @param cacheMetricGroup - metric group for reporting metrics of the cache
     */
    void open(CacheMetricGroup cacheMetricGroup);

    /**
     * Returns the value associated with key in this cache, or null if there is no cached value for
     * key.
     */
    V getIfPresent(K key);

    /**
     * Returns the value associated with the key in the cache, or obtain the value by loader if the
     * key doesn't exist.
     *
     * @param key key - the key whose associated value is to be returned
     * @param loader - the loader for loading associated values from external system
     * @return The value associated with the key
     * @throws IOException if any exception was thrown while loading the value
     */
    V get(K key, Callable<? extends V> loader) throws IOException;

    /**
     * Associates the specified value with the specified key in the cache. If the cache previously
     * contained a value associated with the key, the old value is replaced by the specified value.
     *
     * @return the previous value associated with key, or null if there was no mapping for key.
     * @param key - key with which the specified value is to be associated
     * @param value – value to be associated with the specified key
     */
    V put(K key, V value);

    /**
     * Copies all the mappings from the specified map to the cache. The effect of this call is
     * equivalent to that of calling {@code put(k, v)} on this map once for each mapping from key
     * {@code k} to value {@code v} in the specified map. The behavior of this operation is
     * undefined if the specified map is modified while the operation is in progress.
     */
    void putAll(Map<? extends K, ? extends V> m);

    /** Delete all entries in the cache. */
    void clean();

    /** Returns the number of key-value mappings in the cache. */
    long size();
}

CacheMetricGroup

An interface defining all cache related metric:

CacheMetricGroup
/** Pre-defined metrics for {@code Cache}. */
public interface CacheMetricGroup {

    /** The number of cache hits. */
    Counter getHitCounter();

    /** The number of cache misses. */
    Counter getMissCounter();

    /** The number of times to load data into cache from external system. */
    Counter getLoadCounter();

    /** The number of load failures. */
    Counter getNumLoadFailureCounter();
 
    /** The time spent for the latest load operation. */
    void setLatestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge); 

    /** The number of records in cache. */
    void setNumCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);

    /** The number of bytes used by cache. */
    void setNumCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
}

CacheConfig

A wrapper class containing parameters of the cache:

CacheConfig
public class CacheConfig implements ReadableConfig, WritableConfig {

    private final Configuration configuration = new Configuration();

    public CacheConfig setCacheStrategy(CacheConfigOptions.CacheStrategy strategy) {
        configuration.set(CacheConfigOptions.CACHE_STRATEGY, strategy);
        return this;
    }

    public CacheConfig setTTL(Duration ttl) {
        configuration.set(CacheConfigOptions.CACHE_TTL, ttl);
        return this;
    }

    public CacheConfig setMaxEntries(long maxEntries) {
        configuration.set(CacheConfigOptions.CACHE_MAX_ENTRIES, maxEntries);
        return this;
    }

    public CacheConfig setAllowEmptyValues(boolean allowEmptyValues) {
        configuration.set(CacheConfigOptions.CACHE_ALLOW_EMPTY_VALUES, allowEmptyValues);
        return this;
    }

    @Override
    public <T> T get(ConfigOption<T> option) {
        return configuration.get(option);
    }

    @Override
    public <T> Optional<T> getOptional(ConfigOption<T> option) {
        return configuration.getOptional(option);
    }

    @Override
    public <T> WritableConfig set(ConfigOption<T> option, T value) {
        return configuration.set(option, value);
    }
}  

Cache Options

CacheConfigOptions
public class CacheConfigOptions {

    public static final ConfigOption<CacheStrategy> CACHE_STRATEGY =
            ConfigOptions.key("lookup.cache.strategy")
                    .enumType(CacheStrategy.class)
                    .noDefaultValue()
                    .withDescription(
                            "Strategy of the cache to load and evict entries, including ALL, LRU and NONE");

    public static final ConfigOption<Duration> CACHE_TTL =
            ConfigOptions.key("lookup.cache.ttl")
                    .durationType()
                    .noDefaultValue()
                    .withDescription("Time-to-live (TTL) of entries in cache");

    public static final ConfigOption<Long> CACHE_MAX_ENTRIES =
            ConfigOptions.key("lookup.cache.max-entries")
                    .longType()
                    .noDefaultValue()
                    .withDescription("The maximum number of entries that the cache could contain");

    public static final ConfigOption<Boolean> CACHE_ALLOW_EMPTY_VALUES =
            ConfigOptions.key("lookup.cache.allow-empty-values")
                    .booleanType()
                    .noDefaultValue()
                    .withDescription(
                            "Whether to allow empty values associated with a key in the cache");

    public enum CacheStrategy {
        ALL,
        LRU,
        NONE
    }
}

CachingTableFunction

A helper TableFunction with a default or a custom cache implementation.

CachingTableFunction
/**
 * Base class for lookup function that supports caching.
 */
@PublicEvolving
public abstract class CachingTableFunction extends TableFunction<RowData> {
    private final Cache<RowData, List<RowData>> cache;

    /**
     * Use default cache implementation with specified cache configuration.
     *
     * @param cacheConfig - configuration of the default cache.
     */
    public CachingTableFunction(CacheConfig cacheConfig) {
        cache = new DefaultCache<>(cacheConfig);
    }

    /**
     * Use custom cache implementation.
     *
     * @param cache - custom cache implementation
     */
    public CachingTableFunction(Cache<RowData, List<RowData>> cache) {
        this.cache = cache;
    }

    @Override
    public void open(FunctionContext context) throws Exception {
        cache.open(new InternalCacheMetricGroup(context.getMetricGroup()));
    }

	public void eval(Object... keys){
		// Check the cache first, or load via lookup() if the cache doesn't contains the key
    }
   
    /**
     * Lookup rows associated with the key row from external system.
     *
     * @param key the key row containing lookup fields
     * @throws IOException if any exception was thrown while looking up the value
     */
    public abstract List<RowData> lookup(RowData key) throws IOException; 
}

AllCachingTableFunction

A helper TableFunction that load all entries into the cache and schedules reloading by the specified TTL. 

AllCachingTableFunction
/**
 * Base class for lookup function that loads all entries from external system and schedules
 * reloading by the specified TTL.
 */
public abstract class AllCachingTableFunction extends TableFunction<RowData> {
 	private Cache<RowData, List<RowData>> cache;

    // Interval to reload all entries in cache
    private final Duration reloadInterval;

	public AllCachingTableFunction(CacheConfig cacheConfig) {
		cache = new DefaultCache(cacheConfig);
		reloadInterval = cacheConfig.get(TTL);
	}

    public AllCachingTableFunction(Cache<RowData, List<RowData>> cache, Duration reloadInterval) {
        this.cache = cache;
        this.reloadInterval = reloadInterval;
    }

    @Override
    public void open(FunctionContext context) throws Exception {
        super.open(context);
        scheduleCacheReload(this::reloadAllEntries, reloadInterval);
    }

    public void eval(Object... keys) {
		// Only check the cache here
	}

    /**
     * Reload all entries in the cache.
     *
     * <p>This function will be invoked periodically in a scheduled executor, which invalidates all
     * entries in the cache and reloads all entries from external system.
     *
     * @param cache - the cache in which all entries to be reloaded
     * @throws IOException - if any exception was thrown while reloading the value
     */
    public abstract void reloadAllEntries(Cache<RowData, List<RowData>> cache) throws IOException;
}

CachingAsyncTableFunction

CachingTableFunction
/**
 * Base class for asynchronous lookup function that supports caching.
 */
@PublicEvolving
public abstract class CachingAsyncTableFunction extends AsyncTableFunction<RowData> {
    private final Cache<RowData, List<RowData>> cache;

    /**
     * Use default cache implementation with specified cache configuration.
     *
     * @param cacheConfig - configuration of the default cache.
     */
    public CachingTableFunction(CacheConfig cacheConfig) {
        cache = new DefaultCache<>(cacheConfig);
    }

    /**
     * Use custom cache implementation.
     *
     * @param cache - custom cache implementation
     */
    public CachingTableFunction(Cache<RowData, List<RowData>> cache) {
        this.cache = cache;
    }

    @Override
    public void open(FunctionContext context) throws Exception {
        cache.open(new InternalCacheMetricGroup(context.getMetricGroup()));
    }

	public void eval(CompletableFuture<Collection<RowData>> result, Object... keys){
		// Check the cache first and complete the result directly if cache hit, 
		// or load asynchronously via lookupAsync() if the cache doesn't contains the key
    }
   
    /**
     * Lookup rows associated with the key row asynchronously from external system and return the
     * {@link CompletableFuture} of the lookup result.
     *
     * @param key - the key row whose associated value rows is to be returned
     */
    public abstract CompletableFuture<List<RowData>> lookupAsync(RowData key);
}

Table Options

We propose to add new table options below for configuring lookup table and its underlying cache:

  • lookup.cache.type, enumerator type, the type of cache (ALL, LRU, NONE);
  • lookup.cache.max-rows, long type, maximum number of rows in the cache;
  • lookup.cache.ttl, duration type, the time-to-live(TTL) of entries in the cache;
  • lookup.cache.allow-empty-values, boolean type, whether to allow empty values associated with a key;
  • lookup.max-retries, long type, the maximum number of retries to load a value.

Compatibility, Deprecation, and Migration Plan

Currently we have JDBC, Hive and HBase connector implemented lookup table source. All existing implementations will be migrated to the current design and the migration will be transparent to end users. 

Test Plan

We will use unit and integration test for validating the functionality of cache implementations.

Rejected Alternatives

None

  • No labels