Discussion thread | - |
---|---|
Vote thread | - |
JIRA | - |
Release | - |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
[This FLIP proposal is a joint work between Yunfeng Zhou and Dong Lin ]
Motivation
Currently, Flink supports two types of state backends out of the box, namely HashMapStateBackend and EmbeddedRocksDBStateBackend. If an operator's state is small enough to be held in-memory, it is typically better to use HashMapStateBackend as it can be more than 10X faster than rocksdb. If an operator's state can potentially be very large, it is better to use EmbeddedRocksDBStateBackend to avoid OOM and long GC.
However, in order to make the best choice of the state backend, users would need to have a good understanding of the operator's state usage pattern (e.g. using profiling) and compare it with the machine's available memory. This process is tedious and hard to do well for most users. Even worse, the state size of an operator might increase overtime due to changes in the incoming traffic pattern/throughput, which introduces risk of OOM and performance regression for those operators that are currently using HashMapStateBackend. As a result, many production jobs use EmbeddedRocksDBStateBackend by default due to its reliability, potentially at the cost of reduced throughput.
In this FLIP, we introduce an LRU cache of Java objects that can be used together with RocksDB to ensure that 1) there is no risk of OOM; and 2) operator throughput can be asymptotically close to when it is using HashMapStateBackend as the configured cache size increases. Users only need to configure the maximum number of entries of the LRU cache without worrying about significant (e.g. 10X) performance regression or OOM due to misconfiguration.
Public API
1. Add a job-level config state.backend.cache.max-entries-num
Name: state.backend.cache.max-entries-num
Type: Integer
Default value: 0
Description: It is the maximum number of key/value pairs that can be stored in the LRU cache. It can be used with 'rocksdb' state backend to significantly accelerate its throughput. The cache's states will persisted to the location specified by state.checkpoint-storage when checkpoint is triggered.
2. Add a task-level metric to show LRU cache hit rate.
scope: Task
name: stateCacheHitRatio
description: The cache hit ratio of state access operations, averaged per second.
type: Gauge
Proposed Changes
- The number of key/value pairs in the cache should not exceed the configured value of state.backend.cache.max-entries-num.
- The cache uses the write-back cache policy and the
state.backend.type = rocksdb.
1) Here are the steps taken by
- If the state to the key does not exist:
- Set the key context of the underlying RocksDBStateBackend using the given key.
- Get the value from the underlying RocksDBStateBackend.
- Put the key/value pair into
- If the number of entries in has exceeded its limit, pops the least-recently-used entry, sets key context using this entry's key, and writes this entry into RocksDBStateBackend.
- Update the access order of the key in the LRU cache as appropriate.
- Return the value for the given key.
2) Here are the steps taken by
- Put the key/value pair into
- If the number of entries in has exceeded its limit, pops the least-recently-used entry, sets key context using this entry's key, and writes this entry into RocksDBStateBackend.
- Update the access order of the key in the LRU cache as appropriate.
3) Here are the steps taken by
We make the following notes for the behavior of
- Its internal hashmap-like structure and behavior during checkpoint is pretty similar to the existing CopyOnWriteStateMap used by HashMapStateBackend.
- It supports concurrent checkpoints.
- Each key/value pair will be deep-copied at most once per checkpoint triggering.
- The time complexity of operations done in the synchronous checkpoint stage is O(number_of_hashmap_buckets) and it involves only shallow-copy. So the extra overhead should be lightweight (similar to HashMapStateBackend's sync state).
Analysis
Performance overhead for read/write operations
Prior to this FLIP when using rocksdb as the state backend, each incoming record incurs the following processing overhead:
1 invocation of setCurrentKey()
1 rocksdb get()
1 rocksdb put()
After this FLIP, suppose there is 100% cache hit without any cache eviction, each incoming record incurs the following processing overhead:
1 invocation of setCurrentKey()
1 get() *
1 put() *
After this FLIP, suppose there is 0% cache hit rate with cache eviction for every record, each incoming record incurs the following processing overhead:
2 invocations of setCurrentKey()
1 rocksdb get()
1 rocksdb put()
1 get() *
2 put() *
*: The proposed changes do not actually create a
- The benchmark uses RocksDB state backend.
- The benchmark uses a local filesystem's directory as checkpoint storage. The checkpointing interval is 1000 ms and incremental checkpoint is enabled.
- The benchmark enables object reuse.
- The parallelism of the benchmark job is 1.
- The input of the benchmark job is a sequence of Long values ranging from 0 to 2e7. For each input record x, its key is computed through (x % 500) + 500 * ((x / 1000) % 2). This key selector allows us to determine the cache hit rate by specifying the cache size (state.backend.cache.max-num-entries) as follows.
- If cache size is 250, cache hit rate is 0%.
- If cache size is 500, cache hit rate is 50%.
- If cache size is 1000, cache hit rate is 100% (ignoring the warming up when processing the first 1500 records).
- The benchmark performs a count aggregation for each key, which involves one state read and write operation for each input record. The core logic is as follows.
Long value = state.value(); if (value == null) { value = 0L; } value += 1; state.update(value); output.collect(new StreamRecord<>(new Tuple2<>((Long) getCurrentKey(), value)));
Here are the results:
- When cache hit rate = 0%, the throughput with LRU cache is 95.6% of the throughput without LRU cache.
- When cache hit rate = 50%, the throughput with LRU cache is 1.87 times as much as the throughput without LRU cache.
- When cache hit rate = 100%, the throughput with LRU cache is 21.6 times as much as the throughput without LRU cache.
The decrease is mainly due to the overhead brought by the in-memory cache and the additional changing of the current key in the key context when evicting keys from cache.
Currently, Flink users can set table.exec.mini-batch.* to improve throughput for SQL/table programs. In the following, we will explain how the LRU cache in this FLIP, after combined with another FLIP we plan to work on later, can be strictly better than using table.exec.mini-batch.
Here is an overview of how table.exec.mini-batch works and its pros/cons.
- Flink job will group individual records into min-batch of records (based on size or time) and process these records in batch. Therefore it comes at the cost of the increased heap memory usage and the increased processing latency.
- When there are multiple records with the same key, they will be computed in memory first to get one aggregated value, before this value is merged into rocksdb. This can reduce number of rocksdb access when there are multiple records with the same key.
- When there are multiple records with the same key, only record is output for that key using the final aggregated value. This can reduce the number of output records emitted from this operator.
- The operator needs to flush records buffered in the mini-batch when checkpoint is triggered. This can increase the time of the synchronous checkpoint stage.
As we can see, table.exec.mini-batch brings two optimizations: reduced state backend access and reduced number of output records.
For scenarios where users do not want to increase processing latency, table.exec.mini-batch can not be enabled. In comparison, FLIP-325 can be enabled to reduce state backend access with the following extra benefits:
- Less heap memory usage. The operator only needs to store the key + aggregated value for all unique keys, rather than the full list of the original elements.
- Better cache hit rate. Since hot key does not have to be evicted from the cache periodically (due to mini-batch processing).
- No need to increase processing latency.
- No need to increase time of the synchronous checkpoint stage.
For scenarios where users prefer to reduce the number of output records at the cost of increased processing latency, we plan to introduce another FLIP to achieve this goal. That FLIP can be combined with FLIP-369 to achieve strictly better performance than using table.exec.mini-batch, due to e.g. less heap memory usage.
Compatibility, Deprecation, and Migration Plan
The design proposed in this FLIP is backward compatible.