You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

This FLIP is a draft under editing now, and will be published in a ML discussion thread once ready.

Status

Current stateUnder discussion

Discussion thread: TBD

JIRA: TBD

Released: Not released yet

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

This FLIP is a joint work of Yuan Zhu (zstraw@163.com) and Qingsheng Ren .

Motivation

As a widely-used feature in Flink SQL jobs, the performance of lookup table source is essential not only for users but also source developers for tuning their implementations. Most lookup table sources use cache to achieve better performance, but there are some features missing in the current design of cache:

  • Missing cache related metrics, which is the key to debug and optimize SQL tasks
  • Duplicated implementations. Currently every lookup source needs to implement or use its own cache.
  • Inconsistent. Table options related to caching are defined differently in sources

In order to address the issues above, we propose here to define a unified abstraction for lookup source cache and its related metrics.

Public Interfaces

We propose a new public interface for cache and a set of conventional / standard metrics. Most of designs are inspired by Google GuavaCache.

Cache Metrics

It is important to mention that a cache implementation does not have to report all the defined metrics. But if a cache reports a metric of the same semantic defined below, the implementation should follow the convention.

NameTypeUnitDescription
numCachedRecordGaugeRecordsThe number of records in cache.
numCachedBytesGaugeBytesThe number of bytes used by cache.
hitCountCounter
The number of cache hits
missCountCounter
The number of cache misses, which leads to loading operations
numBytesLoadedTotalCounterBytesThe number of bytes totally loaded to the cache
numRecordsLoadedTotalCounterRecordsThe number of records totally loaded to the cache
latestLoadTimeGaugemsThe time spent for the latest load operation

Here we just define fundamental metrics and let the external metric system make the aggregation to get more descriptive values such as hitRate = hitCount / (hitCount + missCount).

Scope

The metric group for the cache would be a sub-group of the OperatorMetricGroup where the table function belongs to.

Interface of Cache

We propose to introduce a new public interface for all cache implementations, which looks like:

Cache
/**
 * A semi-persistent mapping from keys to values. Cache entries are manually added using {@link
 * #put(Object, Object)}, and are stored in the cache until either evicted or manually invalidated.
 *
 * <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
 * by multiple concurrent threads.
 *
 * @param <K> key
 * @param <V> value
 */
public interface Cache<K, V> {

    /**
     * Returns the value associated with the key in the cache, or obtain the value by loader if the
     * key doesn't exist.
     *
     * @param key key - the key whose associated value is to be returned
     * @param loader - the loader for loading associated values from external system
     * @return The value associated with the key
     * @throws IOException if any exception was thrown while loading the value
     */
    V get(K key, Callable<? extends V> loader) throws IOException;

    /**
     * Associates the specified value with the specified key in the cache. If the cache previously
     * contained a value associated with the key, the old value is replaced by the specified value.
     *
     * @return the previous value associated with key, or null if there was no mapping for key.
     * @param key - key with which the specified value is to be associated
     * @param value – value to be associated with the specified key
     */
    V put(K key, V value);

    /**
     * Copies all the mappings from the specified map to the cache. The effect of this call is
     * equivalent to that of calling {@code put(k, v)} on this map once for each mapping from key
     * {@code k} to value {@code v} in the specified map. The behavior of this operation is
     * undefined if the specified map is modified while the operation is in progress.
     */
    void putAll(Map<? extends K, ? extends V> m);

    /** Delete all entries in the cache. */
    void clean();

    /** Returns the number of key-value mappings in the cache. */
    long size();
}



Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels