Status
Current state: Accepted
Discussion thread: http://mail-archives.apache.org/mod_mbox/kafka-dev/202005.mbox/%3CCADR0NwzJBJa6WihnpmGj0R%2BYPVrojq4Kg_hOArNEytHAG-tZAQ%40mail.gmail.com%3E
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Monitoring the memory used by RocksDB instances run in a Kafka Streams application would allow to react to an increased memory demand by RocksDB before the application runs out of memory. Although, the memory used by RocksDB can be bounded in Kafka Streams, the bound is not a hard limit. Currently, the metrics exposed by Kafka Streams include information about RocksDB instances run by an application (see KIP-471: Expose RocksDB Metrics in Kafka Streams for more details) but they do not provide any information about the memory usage of the RocksDB instances. This KIP proposes to add metrics that record the memory used by RocksDB to Kafka Streams.
Public Interfaces
Each added metric will be on store-level and have the following tags:
- type = stream-state-metrics
- thread-id = [thread ID]
- task-id = [task ID]
- rocksdb-state-id = [store ID] for key-value stores
- rocksdb-session-state-id = [store ID] for session stores
- rocksdb-window-state-id = [store ID] for window stores
The following metrics will be exposed in the Kafka Streams' metrics
- num-immutable-mem-table
- cur-size-active-mem-table
- cur-size-all-mem-tables
- size-all-mem-tables
- num-entries-active-mem-table
- num-entries-imm-mem-tables
- num-deletes-active-mem-table
- num-deletes-imm-mem-tables
- mem-table-flush-pending
- num-running-flushes
- compaction-pending
- num-running-compactions
- estimate-pending-compaction-bytes
- total-sst-files-size
- live-sst-files-size
- num-live-versions
- block-cache-capacity
- block-cache-usage
- block-cache-pinned-usage
- estimate-num-keys
- estimate-table-readers-mem
- estimate-oldest-key-time
- background-errors
The recording level for all metrics will be INFO.
The following change to the RocksDBConfigSetter
interface is needed to get a cache that is possibly instantiated in an implementation of the config setter. Unfortunately, RocksDB's Java API does not offer a way to get the cache from the RocksDB instance. However, the metrics in this KIP need to know the cache that is used by the RocksDB instance to be able to aggregate the values correctly over the segments of a segmented state store.
public interface RocksDBConfigSetter { void setConfig(final String storeName, final Options options, final Map<String, Object> configs); /** * Returns the cache possibly instantiated in the instance * that implements this interface. * * If no cache is instantiated, the default behavior does not * need to be overridden. * * This method is needed because RocksDB's Java API does not offer * any mean to get a cache that is passed to the RocksDB instance * by the config setter. */ default Cache cache() { // new method return null; } default void close(final String storeName, final Options options) { LOG.warn("The default close will be removed in 3.0.0 -- you should overwrite it if you have implemented RocksDBConfigSetter"); } }
Proposed Changes
In this section, we will explain the meaning of the metrics listed in the previous section. To better understand the metrics, some basic concepts of RocksDB need to be explained first.
- Memtable: Memtables are in-memory write buffers. Each new key-value pair is first written to a memtable and each read looks first into the memtable before it looks on disk. Once a memtable is full it becomes immutable and it is replaced by a new memtable. A background thread flushes a memtable asynchronously to disk. Additionally, memtables can also be flushed manually. RocksDB keeps in memory the currently active memtables, full but not yet flushed memtables, and flushed memtables that are kept around to maintain write history in memory.
- Compaction: From time to time RocksDB needs to clean up the data it stores on disk and bring is LSM tree into a good shape (see https://github.com/facebook/rocksdb/wiki/Compaction). Compactions might block writes and flushes. Additionally, RocksDB offers different compaction algorithms with different properties. Thus, it is a good practise to monitor compactions in RocksDB.
- SST files: SST files are the files in which RocksDB stores the data on disk. SST stands for Sorted Sequence Table.
- Version: A version consists of all the live SST files at one point of time. Once a flush or compaction finishes, a new version is created because the list of live SST files has changed. An old version can be used by on-going read requests or compaction jobs. Old versions will eventually be garbage collected.
- Cache: RocksDB caches data in memory for reads. By default, those caches contain only data blocks, i.e., uncompressed sequences of key-value pairs in sorted order. Therefore this cache is often referred to as block cache. However, users can configure RocksDB to also store index and filter blocks in the cache.
num-immutable-mem-table
Number of immutable memtables that have not yet been flushed. For segmented state stores, the sum of the number of immutable memtables over all segments is reported.
cur-size-active-mem-table
Approximate size of active memtable in bytes. For segmented state stores, the sum of the sizes over all segments is reported.
cur-size-all-mem-tables
Approximate size of active and unflushed immutable memtable in bytes. For segmented state stores, the sum of sizes over all segments is reported.
size-all-mem-tables
Approximate size of active, unflushed immutable, and pinned immutable memtables in bytes. For segmented state stores, the sum of sizes over all segments is reported.
num-entries-active-mem-table
Total number of entries in the active memtable. For segmented state stores, the sum of number of entries over all segments is reported.
num-entries-imm-mem-tables
Total number of entries in the unflushed immutable memtables. For segmented state stores, the sum of number of entries over all segments is reported.
num-deletes-active-mem-table
Total number of delete entries in the active memtable. For segmented state stores, the sum of number of deletes over all segments is reported.
num-deletes-imm-mem-tables
Total number of delete entries in the unflushed immutable memtables. For segmented state stores, the sum of number of deletes over all segments is reported.
mem-table-flush-pending
This metric returns 1 if a memtable flush is pending; otherwhise it returns 0. For segmented state stores, the sum of pending flushes over all segments is reported.
num-running-flushes
Number of currently running flushes. For segmented state stores, the sum of running flushes over all segments is reported.
compaction-pending
This metric 1 if at least one compaction is pending; otherwise, the metric reports 0. For segmented state stores, the sum of ones and zeros over all segments is reported.
num-running-compactions
Number of currently running compactions. For segmented state stores, the sum of the number of currently running compactions over all segments is reported.
estimate-pending-compaction-bytes
Estimated total number of bytes a compaction needs to rewrite to get all levels down to under target size. This metric is not valid for compactions other than level-based. For segmented state stores, the sum of the estimated total number of bytes over all segments is reported.
total-sst-files-size
Total size in bytes of all SST files. For segmented state stores, the sum of the sizes of SST files over all segments is reported.
live-sst-files-size
Total size in bytes of all SST files that belong to the latest LSM tree. For segmented state stores, the sum of the sizes of SST files over all segments is reported.
num-live-versions
Number of live versions. More live versions often mean more SST files are held from being deleted, by iterators or unfinished compactions. For segmented state stores, the sum of the number of versions over all segments is reported.
block-cache-capacity
Block cache capacity. For segmented state stores, the sum of the cache capacity over all segments is reported, if separate caches are used, otherwise, if only one cache is used, the cache capacity of any segment is reported.
block-cache-usage
Memory size for the entries residing in block cache. For segmented state stores, the sum of the cache capacity over all segments is reported, if separate caches are used, otherwise, if only one cache is used, the cache capacity of any segment is reported.
block-cache-pinned-usage
Memory size for the entries being pinned. For segmented state stores, the sum of the cache capacity over all segments is reported, if separate caches are used, otherwise, if only one cache is used, the cache capacity of any segment is reported.
estimate-num-keys
Estimated number of total keys in the active and unflushed immutable memtables and storage. For segmented state stores, the sum of the estimated number of keys over all segments is reported.
estimate-table-readers-mem
Estimated memory in bytes used for reading SST tables, excluding memory used in block cache (e.g., filter and index blocks). For segmented state stores, the sum of the estimated memory over all segments is reported.
estimate-oldest-key-time
Estimated oldest key timestamp in the RocksDB instance. This metric is currently only available for FIFO compaction with compaction_options_fifo.allow_compaction = false. For segmented state stores, the minimum of the estimated oldest key timestamp over all segments is reported.
background-errors
Accumulated number of background errors. For segmented state stores, the sum of the number of background errors over all segments is reported.
Compatibility, Deprecation, and Migration Plan
Since metrics are only added and no other metrics are modified, this KIP should not
- affect backward-compatibility
- deprecate public interfaces
- need a migration plan other than adding the new metrics to its own monitoring component
Rejected Alternatives
Introduce configuration in Kafka Streams to name RocksDB properties to expose
Since all of the above metrics can be exposed as gauges, there should not be too much performance overhead because recording is only triggered when the metric is actually queried. We thought that the maintenance costs of a configuration would be higher than just exposing this set of RocksDB properties.