You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »

Status

Current stateUnder discussion

Discussion thread: https://lists.apache.org/thread/1vokqdnnt01yycl7y1p74g556cc8yvtq

JIRA: TBD

Released: Not released yet

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

This FLIP is a joint work of Yuan Zhu (zstraw@163.com) and Qingsheng Ren .

Motivation

As a widely-used feature in Flink SQL jobs, the performance of lookup table source is essential not only for users but also source developers for tuning their implementations. Most lookup table sources use cache to achieve better performance, but there are some features missing in the current design of cache:

  • Missing cache related metrics, which is the key to debug and optimize SQL tasks
  • Duplicated implementations. Currently every lookup source needs to implement or use its own cache.
  • Inconsistent. Table options related to caching are defined differently in sources

In order to address the issues above, we propose here to define a unified abstraction for lookup source cache and its related metrics.

Proposed Changes

We'd like to split the proposal into two kinds of caching strategies: LRU cache and all cache.

LRU cache

LRU is the most common caching strategy, which dynamically evicts entries in the cache according to the given configuration. For supporting LRU cache in lookup table, we propose several new interfaces to simplify the work for developers to implement lookup table functions and enable cache:

  • LookupFunction / AsyncLookupFunction, an extended version of TableFunction to clarify the semantic of lookup.
  • LookupCache / LookupCacheFactory, defining the cache and its factory used in lookup table.
  • DefaultLookupCacheFactory, a default implementation of a LRU cache that suitable for most use cases.
  • LookupCacheMetricGroup, defining metrics should be reported by the lookup cache.
  • LookupFunctionProvider / AsyncLookupFunctionProvider, as the API interacting with table source to get LookupFunction and LookupCacheFactory.

The LRU cache serves as a component in LookupJoinRunner, and would be pluggable by specifying LookupCacheFactory in LookupFunctionProvider. The developer of a lookup table needs to define a LookupFunctionProvider / AsyncLookupProvider in their implementation of LookupTableSource to specify the LookupFunction and the factory of the cache, then the planner will take over the cache factory, pass it to the LookupJoinRunner, and the cache will be instantiated during the runtime execution.

All Cache

If the size of lookup table is relatively small to fit into the memory, and the lookup table doesn't change frequently, it'll be more efficient to load all entries of the lookup table into the cache to reduce network I/O, and refresh the table periodically. We'd like to name this use case as "all cache". Logically the reload operation is a kind of scan, so we'd like to reuse the ScanRuntimeProvider so that developers could reuse the scanning logic implemented in Source / SourceFunction / InputFormat. Considering the complexity of Source API, we'd like to support SourceFunction and InputFormat API first. Supporting Source API might require new topology and will be discussed later in another FLIP.

We propose to introduce a new interface RescanRuntimeProvider in order to reuse the ability of scanning.

Public Interfaces

Lookup Functions

As the usage of TableFunction interface is not quite straight forward to lookup table developers, we'd like to introduce a new interface for sync and async lookup tables. Caching will be only supported on LookupFunction / AsyncLookupFunction.

LookupFunction
/**
 * A wrapper class of {@link TableFunction} for synchronously lookup rows matching the lookup keys
 * from external system.
 *
 * <p>The output type of this table function is fixed as {@link RowData}.
 */
@PublicEvolving
public abstract class LookupFunction extends TableFunction<RowData> {

    /**
     * Synchronously lookup rows matching the lookup keys.
     *
     * @param keys - Keys to lookup.
     * @return A collections of all matching rows in the lookup table.
     */
    public abstract Collection<RowData> lookup(Object... keys) throws IOException;

    /** Invoke {@link #lookup} and handle exceptions. */
    public final void eval(Object... keys) {
        try {
            lookup(keys).forEach(this::collect);
        } catch (IOException e) {
            throw new RuntimeException("Failed to lookup values with given key", e);
        }
    }
}
AsyncLookupFunction
/**
 * A wrapper class of {@link AsyncTableFunction} for asynchronously lookup rows matching the lookup
 * keys from external system.
 *
 * <p>The output type of this table function is fixed as {@link RowData}.
 */
@PublicEvolving
public abstract class AsyncLookupFunction extends AsyncTableFunction<RowData> {

    /**
     * Asynchronously lookup rows matching the lookup keys.
     *
     * @param keys - Keys to lookup.
     * @return A collections of all matching rows in the lookup table.
     */
    public abstract CompletableFuture<Collection<RowData>> asyncLookup(Object... keys);

    /** Invokes {@link #asyncLookup} and chains futures. */
    public final void eval(CompletableFuture<Collection<RowData>> future, Object... keys) {
        asyncLookup(keys)
                .whenCompleteAsync(
                        (result, exception) -> {
                            if (exception != null) {
                                future.completeExceptionally(exception);
                                return;
                            }
                            future.complete(result);
                        });
    }
}

LookupCache

Considering there might be custom caching strategies and optimizations, we'd like to expose the cache interface as public API for developers to make the cache pluggable.

LookupCache
/**
 * A semi-persistent mapping from keys to values for storing entries of lookup table.
 *
 * <p>The type of the caching key is a {@link RowData} with lookup key fields packed inside. The
 * type of value is a {@link Collection} of {@link RowData}, which are rows matching lookup key
 * fields.
 *
 * <p>Cache entries are manually added using {@link #put}, and are stored in the cache until either
 * evicted or manually invalidated.
 *
 * <p>Implementations of this interface are expected to be thread-safe, and can be safely accessed
 * by multiple concurrent threads.
 */
@PublicEvolving
public interface LookupCache {

    /**
     * Returns the value associated with key in this cache, or null if there is no cached value for
     * key.
     */
    @Nullable
    Collection<RowData> getIfPresent(RowData key);

    /**
     * Associates the specified value rows with the specified key row in the cache. If the cache
     * previously contained value associated with the key, the old value is replaced by the
     * specified value.
      *
     * @return the previous value rows associated with key, or null if there was no mapping for key.
     * @param key - key row with which the specified value is to be associated
     * @param value – value rows to be associated with the specified key
     */
    Collection<RowData> put(RowData key, Collection<RowData> value);

    /**
     * Copies all the mappings from the specified map to the cache. The effect of this call is
     * equivalent to that of calling {@code put(k, v)} on this map once for each mapping from key
     * {@code k} to value {@code v} in the specified map. The behavior of this operation is
     * undefined if the specified map is modified while the operation is in progress.
     */
    void putAll(Map<? extends RowData, ? extends Collection<RowData>> m);

    /** Discards any cached value for the specified key. */
    void invalidate(RowData key);

    /** Discards all entries in the cache. */
    void invalidateAll();

    /** Returns the number of key-value mappings in the cache. */
    long size();
}

LookupCacheFactory

As the cache should be instantiated during runtime execution to avoid serialization / deserialization, a factory is required for creating the cache. 

LookupCacheFactory
/** Factory for creating an instance of {@link LookupCache}. */
@PublicEvolving
public interface LookupCacheFactory extends Serializable {

    /**
     * Create a {@link LookupCache}.
     *
     * @param metricGroup - The lookup cache metric group in which the cache register predefined and
     *     custom metrics.
     */
    LookupCache createCache(LookupCacheMetricGroup metricGroup);
}

DefaultLookupCacheFactory

In order to simplify the usage of developer, we provide a default factory for building a default cache. 

DefaultLookupCacheFactory
/** Factory for creating instance of {@link DefaultLookupCache}. */
@PublicEvolving
public class DefaultLookupCacheFactory implements LookupCacheFactory {
    private final Duration expireAfterAccessDuration;
    private final Duration expireAfterWriteDuration;
    private final Integer initialCapacity;
    private final Long maximumSize;

    public static DefaultLookupCacheFactory.Builder newBuilder() {
        return new Builder();
    }

    private DefaultLookupCacheFactory(
            Duration expireAfterAccessDuration,
            Duration expireAfterWriteDuration,
            Integer initialCapacity,
            Long maximumSize) {
        // Validation
        this.expireAfterAccessDuration = expireAfterAccessDuration;
        this.expireAfterWriteDuration = expireAfterWriteDuration;
        this.initialCapacity = initialCapacity;
        this.maximumSize = maximumSize;
    }

    @Override
    public LookupCache createCache(LookupCacheMetricGroup metricGroup) {
        // Create instance of DefaultLookupCache
    }

    /** Builder of {@link DefaultLookupCacheFactory}. */
    public static class Builder {
        private Duration expireAfterAccessDuration;
        private Duration expireAfterWriteDuration;
        private Integer initialCapacity;
        private Long maximumSize;

        public DefaultLookupCacheFactory.Builder expireAfterAccess(Duration duration) {
            expireAfterAccessDuration = duration;
            return this;
        }

        public DefaultLookupCacheFactory.Builder expireAfterWrite(Duration duration) {
            expireAfterWriteDuration = duration;
            return this;
        }

        public DefaultLookupCacheFactory.Builder initialCapacity(int initialCapacity) {
            this.initialCapacity = initialCapacity;
            return this;
        }

        public DefaultLookupCacheFactory.Builder maximumSize(long maximumSize) {
            this.maximumSize = maximumSize;
            return this;
        }

        public DefaultLookupCacheFactory build() {
            return new DefaultLookupCacheFactory(
                    expireAfterAccessDuration,
                    expireAfterWriteDuration,
                    initialCapacity,
                    maximumSize);
        }
    }
}

LookupCacheMetricGroup

An interface defining all cache related metric:

CacheMetricGroup
/** Pre-defined metrics for {@code LookupCache}. */
@PublicEvolving
public interface LookupCacheMetricGroup extends MetricGroup {
    /** The number of cache hits. */
    void setHitCounter(Counter hitCounter);

    /** The number of cache misses. */
    void setMissCounter(Counter missCounter);

    /** The number of times to load data into cache from external system. */
    void setLoadCounter(Counter loadCounter);

    /** The number of load failures. */
    void setNumLoadFailuresCounter(Counter numLoadFailuresCounter);

    /** The time spent for the latest load operation. */
    void setLatestLoadTimeGauge(Gauge<Long> latestLoadTimeGauge);

    /** The number of records in cache. */
    void setNumCachedRecordsGauge(Gauge<Long> numCachedRecordsGauge);

    /** The number of bytes used by cache. */
    void setNumCachedBytesGauge(Gauge<Long> numCachedBytesGauge);
}

LookupFunctionProvider

This is the API between table framework and user's table source. Implementation should define how to create a lookup function and whether to use cache. 

LookupFunctionProvider
/**
 * Provider for creating {@link LookupFunction} and {@link LookupCacheFactory} if caching should be
 * enabled for the lookup table.
 */
@PublicEvolving
public interface LookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {

    /**
     * Creates {@link LookupFunctionProvider} with the given {@link LookupFunction} and disable
     * lookup table caching.
     */
    static LookupFunctionProvider of(LookupFunction lookupFunction) {
        return new LookupFunctionProvider() {
            @Override
            public LookupFunction createLookupFunction() {
                return lookupFunction;
            }

            @Override
            public Optional<LookupCacheFactory> getCacheFactory() {
                return Optional.empty();
            }

            @Override
            public Optional<Boolean> cacheMissingKey() {
                return Optional.empty();
            }
        };
    }

    /**
     * Creates {@link LookupFunctionProvider} with the given {@link LookupFunction} and enable
     * caching with specified {@link LookupCacheFactory}.
     */
    static LookupFunctionProvider of(
            LookupFunction lookupFunction,
            LookupCacheFactory cacheFactory,
            boolean cacheMissingKey) {
        return new LookupFunctionProvider() {
            @Override
            public LookupFunction createLookupFunction() {
                return lookupFunction;
            }

            @Override
            public Optional<LookupCacheFactory> getCacheFactory() {
                return Optional.of(cacheFactory);
            }

            @Override
            public Optional<Boolean> cacheMissingKey() {
                return Optional.of(cacheMissingKey);
            }
        };
    }

    /** Creates an {@link LookupFunction} instance. */
    LookupFunction createLookupFunction();

    /**
     * Gets the {@link LookupCacheFactory} for creating lookup cache.
     *
     * <p>This factory will be used for creating an instance of cache during runtime execution for
     * optimizing the access to external lookup table.
     *
     * @return an {@link Optional} of {@link LookupCacheFactory}, or an empty {@link Optional} if
     *     caching shouldn't be applies to the lookup table.
     */
    Optional<LookupCacheFactory> getCacheFactory();

    /**
     * Whether the missing key (key fields without any matching value rows) should be stored in the
     * cache.
     *
     * <p>Please note that this option is required if {@link #getCacheFactory()} returns a non-empty
     * instance. If the cache factory is empty, the return value of this function will be ignored.
     *
     * @return true if a null or empty value should be stored in the cache.
     */
    Optional<Boolean> cacheMissingKey();
}

AsyncLookupFunctionProvider

AsyncLookupFunctionProvider
@PublicEvolving
public interface AsyncLookupFunctionProvider extends LookupTableSource.LookupRuntimeProvider {

    /**
     * Creates {@link AsyncLookupFunctionProvider} with the given {@link AsyncLookupFunction} and
     * disable lookup table caching.
     */
    static AsyncLookupFunctionProvider of(AsyncLookupFunction asyncLookupFunction) {
        return new AsyncLookupFunctionProvider() {
            @Override
            public AsyncLookupFunction createAsyncLookupFunction() {
                return asyncLookupFunction;
            }

            @Override
            public Optional<LookupCacheFactory> getCacheFactory() {
                return Optional.empty();
            }

            @Override
            public Optional<Boolean> cacheMissingKey() {
                return Optional.empty();
            }
        };
    }

    /**
     * Creates {@link AsyncLookupFunctionProvider} with the given {@link AsyncLookupFunction} and
     * enable caching with specified {@link LookupCacheFactory}.
     */
    static AsyncLookupFunctionProvider of(
            AsyncLookupFunction asyncLookupFunction,
            LookupCacheFactory cacheBuilder,
            boolean cacheMissingKey) {
        return new AsyncLookupFunctionProvider() {
            @Override
            public AsyncLookupFunction createAsyncLookupFunction() {
                return asyncLookupFunction;
            }

            @Override
            public Optional<LookupCacheFactory> getCacheFactory() {
                return Optional.of(cacheBuilder);
            }

            @Override
            public Optional<Boolean> cacheMissingKey() {
                return Optional.of(cacheMissingKey);
            }
        };
    }

    /** Creates an {@link AsyncLookupFunction} instance. */
    AsyncLookupFunction createAsyncLookupFunction();

    /**
     * Gets the {@link LookupCacheFactory} for creating lookup cache.
     *
     * <p>This factory will be used for creating an instance of cache during runtime execution for
     * optimizing the access to external lookup table.
     *
     * @return an {@link Optional} of {@link LookupCacheFactory}, or an empty {@link Optional} if
     *     caching shouldn't be applies to the lookup table.
     */
    Optional<LookupCacheFactory> getCacheFactory();

    /**
     * Whether the missing key (key fields without any matching value rows) should be stored in the
     * cache.
     *
     * <p>Please note that this option is required if {@link #getCacheFactory()} returns a non-empty
     * instance. If the cache factory is empty, the return value of this function will be ignored.
     *
     * @return true if a null or empty value should be stored in the cache.
     */
    Optional<Boolean> cacheMissingKey();
}

RescanRuntimeProvider

This interface is for supporting all cache strategy. It reuses ScanRuntimeProvider and defines interval of re-scan. 

RescanRuntimeProvider
/**
 * Runtime provider for periodically re-scanning all entries of the lookup table and storing the
 * table locally for lookup.
 */
@PublicEvolving
public interface RescanRuntimeProvider extends LookupTableSource.LookupRuntimeProvider {

    /**
     * Gets the {@link org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider}
     * for executing the periodically re-scan.
     */
    ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider();

    /** Gets the interval between two re-scans. */
    Duration getRescanInterval();
}


Cache Metrics

It is important to mention that a cache implementation does not have to report all the defined metrics. But if a cache reports a metric of the same semantic defined below, the implementation should follow the convention.

NameTypeUnitDescription
numCachedRecordGaugeRecordsThe number of records in cache.
numCachedBytesGaugeBytesThe number of bytes used by cache.
hitCountCounter
The number of cache hits
missCountCounter
The number of cache misses, which might leads to loading operations
loadCountCounter

The number of times to load data into cache from external system.

For LRU cache the load count should be equal to miss count, but for all cache this would be different.

numLoadFailureCounter
The number of load failures
latestLoadTimeGaugemsThe time spent for the latest load operation

Here we just define fundamental metrics and let the external metric system make the aggregation to get more descriptive values such as hitRate = hitCount / (hitCount + missCount).

Scope

The metric group for the cache would be a sub-group of the OperatorMetricGroup where the table function belongs to.

Compatibility, Deprecation, and Migration Plan

Currently we have JDBC, Hive and HBase connector implemented lookup table source. All existing implementations will be migrated to the current design and the migration will be transparent to end users. 

Test Plan

We will use unit and integration test for validating the functionality of cache implementations.

Rejected Alternatives

Add cache in TableFunction implementations

Compared with this design, adding cache in TableFunction implementations might lead to inconsistency between sync and async table function, and not suitable for applying optimizations. 


  • No labels