Discussion thread-
Vote thread-
JIRA

-

Release-

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


[This FLIP proposal is a joint work between Yunfeng Zhou  and Dong Lin ]

Motivation

Currently, Flink supports two types of state backends out of the box, namely HashMapStateBackend and EmbeddedRocksDBStateBackend. If an operator's state is small enough to be held in-memory, it is typically better to use HashMapStateBackend as it can be more than 10X faster than rocksdb. If an operator's state can potentially be very large, it is better to use EmbeddedRocksDBStateBackend to avoid OOM and long GC.

However, in order to make the best choice of the state backend, users would need to have a good understanding of the operator's state usage pattern (e.g. using profiling) and compare it with the machine's available memory. This process is tedious and hard to do well for most users. Even worse, the state size of an operator might increase overtime due to changes in the incoming traffic pattern/throughput, which introduces risk of OOM and performance regression for those operators that are currently using HashMapStateBackend. As a result, many production jobs use EmbeddedRocksDBStateBackend by default due to its reliability, potentially at the cost of reduced throughput.

In this FLIP, we introduce an LRU cache of Java objects that can be used together with RocksDB to ensure that 1) there is no risk of OOM; and 2) operator throughput can be asymptotically close to when it is using HashMapStateBackend as the configured cache size increases. Users only need to configure the maximum number of entries of the LRU cache without worrying about significant (e.g. 10X) performance regression or OOM due to misconfiguration.

Public API

1. Add a job-level config state.backend.cache.max-entries-num

  • Name: state.backend.cache.max-entries-num

  • Type: Integer

  • Default value: 0

  • Description: It is the maximum number of key/value pairs that can be stored in the LRU cache. It can be used with 'rocksdb' state backend to significantly accelerate its throughput. The cache's states will persisted to the location specified by state.checkpoint-storage when checkpoint is triggered.

2. Add a task-level metric to show LRU cache hit rate.

  • scope: Task

  • name: stateCacheHitRatio

  • description: The cache hit ratio of state access operations, averaged per second.

  • type: Gauge

Proposed Changes

We propose to introduce StateBackendWrapperWithLRUCache as a subclass of StateBackend. At a high level, this class uses object-level in-memory cache to accelerate access to another state backend (e.g. EmbeddedRocksDBStateBackend). When the configured cache size is large enough to hold all the key/value pairs, this class behaviors pretty much the same as the HashMapStateBackend. When the configured cache size is not large enough, this class uses the underlying state backend as the fallback to persist the key/value pairs.

When state.backend.cache.max-entries-num is configured to be larger than 0, a StateBackendWrapperWithLRUCache will be constructed with the configured size limit as well as an instance of state backend as specified by state.backend.type. Then it will be used by Flink runtime to serve all the state backend read/write requests.


StateBackendWrapperWithLRUCache enforces the following logic on top of the underlying state backend:

  • The number of key/value pairs in the cache should not exceed the configured value of state.backend.cache.max-entries-num.
  • The cache uses the write-back cache policy and the least-recently-used eviction strategy.


In the following, we describe the behavior of StateBackendWrapperWithLRUCache when read, write and checkpoint operations are triggered. The description assumes state.backend.type = rocksdb.

1) Here are the steps taken by StateBackendWrapperWithLRUCache to get a value for a given key:

  • If the state to the key does not exist:
    • Set the key context of the underlying RocksDBStateBackend using the given key.
    • Get the value from the underlying RocksDBStateBackend.
    • Put the key/value pair into StateBackendWrapperWithLRUCache.
      • If the number of entries in StateBackendWrapperWithLRUCache has exceeded its limit, StateBackendWrapperWithLRUCache pops the least-recently-used entry, sets key context using this entry's key, and writes this entry into RocksDBStateBackend.
  • Update the access order of the key in the LRU cache as appropriate.
  • Return the value for the given key.

2) Here are the steps taken by StateBackendWrapperWithLRUCache to put a key/value pair:

  • Put the key/value pair into StateBackendWrapperWithLRUCache.
    • If the number of entries in StateBackendWrapperWithLRUCache has exceeded its limit, StateBackendWrapperWithLRUCache pops the least-recently-used entry, sets key context using this entry's key, and writes this entry into RocksDBStateBackend.
  • Update the access order of the key in the LRU cache as appropriate.

3) Here are the steps taken by StateBackendWrapperWithLRUCache to perform a checkpoint:

  • In the synchronous checkpoint stage, StateBackendWrapperWithLRUCache will shallow-copy an array of buckets (which is used to implement a hashmap-like structure) into a new instance of StateBackendWrapperWithLRUCache.
  • In the asynchronous checkpoint stage, Flink runtime will serialize all key/value pairs referenced in the copied instance of StateBackendWrapperWithLRUCache and persist the result to the location specified by state.checkpoint-storage.
  • When a key/value pair put() operation is triggered, if the reference of this key/value pair is still waiting to be serialized by an ongoing checkpoint, then Flink runtime will deep-copy this key/value pair (as well as all other key/value pairs stored in the same bucket of the internal hashmap-like structure as this key/value pair) and update/mutate the resulting key/value pair as appropriate.

We make the following notes for the behavior of StateBackendWrapperWithLRUCache:

  • Its internal hashmap-like structure and behavior during checkpoint is pretty similar to the existing CopyOnWriteStateMap used by HashMapStateBackend.
  • It supports concurrent checkpoints.
  • Each key/value pair will be deep-copied at most once per checkpoint triggering.
  • The time complexity of operations done in the synchronous checkpoint stage is O(number_of_hashmap_buckets) and it involves only shallow-copy. So the extra overhead should be lightweight (similar to HashMapStateBackend's sync state).


Analysis

Performance overhead for read/write operations

Suppose we have an aggregation operator that needs to read and write state backend once for each incoming record.

Prior to this FLIP when using rocksdb as the state backend, each incoming record incurs the following processing overhead:

  • 1 invocation of setCurrentKey()

  • 1 rocksdb get()

  • 1 rocksdb put()

After this FLIP, suppose there is 100% cache hit without any cache eviction, each incoming record incurs the following processing overhead:

  • 1 invocation of setCurrentKey()

  • 1 HashMapStateBackend get() *

  • 1 HashMapStateBackend put() *

After this FLIP, suppose there is 0% cache hit rate with cache eviction for every record, each incoming record incurs the following processing overhead:

  • 2 invocations of setCurrentKey()

  • 1 rocksdb get()

  • 1 rocksdb put()

  • 1 HashMapStateBackend get() *

  • 2 HashMapStateBackend put() *

*: The proposed changes do not actually create a HashMapStateBackend and invokes its get()/put() methods, but the corresponding workload would be similar except additionally updating the access order of an entry, which is supposed to be relatively small.

Therefore, in comparison to using rocksdb alone, using LRU cache with 0% cache hit rate can additionally incur 1 setCurrentKey(), 1 HashMapStateBackend get() and 1 HashMapStateBackend put(). These overhead are expected to be considerably lower than calling 1 rocksdb get() and 1 rocksdb put().


Benchmark Results

We have implemented a simple POC for the proposed cache mechanism and verified the improvement it could bring to the throughput of a job with state access operations. Below is the configuration of the benchmark.

  • The benchmark uses RocksDB state backend.
  • The benchmark uses a local filesystem's directory as checkpoint storage. The checkpointing interval is 1000 ms and incremental checkpoint is enabled.
  • The benchmark enables object reuse.
  • The parallelism of the benchmark job is 1.
  • The input of the benchmark job is a sequence of Long values ranging from 0 to 2e7. For each input record x, its key is computed through (x % 500) + 500 * ((x / 1000) % 2). This key selector allows us to determine the cache hit rate by specifying the cache size (state.backend.cache.max-num-entries) as follows.
    • If cache size is 250, cache hit rate is 0%.
    • If cache size is 500, cache hit rate is 50%.
    • If cache size is 1000, cache hit rate is 100% (ignoring the warming up when processing the first 1500 records).
  • The benchmark performs a count aggregation for each key, which involves one state read and write operation for each input record. The core logic is as follows.
Long value = state.value();
if (value == null) {
    value = 0L;
}
value += 1;
state.update(value);
output.collect(new StreamRecord<>(new Tuple2<>((Long) getCurrentKey(), value)));

Here are the results:

  • When cache hit rate = 0%, the throughput with LRU cache is 95.6% of the throughput without LRU cache. 
  • When cache hit rate = 50%, the throughput with LRU cache is 1.87 times as much as the throughput without LRU cache.
  • When cache hit rate = 100%, the throughput with LRU cache is 21.6 times as much as the throughput without LRU cache.

The decrease is mainly due to the overhead brought by the in-memory cache and the additional changing of the current key in the key context when evicting keys from cache.


Related Work

Currently, Flink users can set table.exec.mini-batch.* to improve throughput for SQL/table programs. In the following, we will explain how the LRU cache in this FLIP, after combined with another FLIP we plan to work on later, can be strictly better than using table.exec.mini-batch.


Here is an overview of how table.exec.mini-batch works and its pros/cons.

  • Flink job will group individual records into min-batch of records (based on size or time) and process these records in batch. Therefore it comes at the cost of the increased heap memory usage and the increased processing latency.
  • When there are multiple records with the same key, they will be computed in memory first to get one aggregated value, before this value is merged into rocksdb. This can reduce number of rocksdb access when there are multiple records with the same key.
  • When there are multiple records with the same key, only record is output for that key using the final aggregated value. This can reduce the number of output records emitted from this operator.
  • The operator needs to flush records buffered in the mini-batch when checkpoint is triggered. This can increase the time of the synchronous checkpoint stage.


As we can see, table.exec.mini-batch brings two optimizations: reduced state backend access and reduced number of output records.

For scenarios where users do not want to increase processing latency, table.exec.mini-batch can not be enabled. In comparison, FLIP-325 can be enabled to reduce state backend access with the following extra benefits:

  • Less heap memory usage. The operator only needs to store the key + aggregated value for all unique keys, rather than the full list of the original elements.
  • Better cache hit rate. Since hot key does not have to be evicted from the cache periodically (due to mini-batch processing).
  • No need to increase processing latency.
  • No need to increase time of the synchronous checkpoint stage.


For scenarios where users prefer to reduce the number of output records at the cost of increased processing latency, we plan to introduce another FLIP to achieve this goal. That FLIP can be combined with FLIP-369 to achieve strictly better performance than using table.exec.mini-batch, due to e.g. less heap memory usage.


Compatibility, Deprecation, and Migration Plan

The design proposed in this FLIP is backward compatible.