Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Monitoring the memory used by RocksDB instances run in a Kafka Streams application would allow allows to react to an increased memory demand by and disk demands of RocksDB as well as to other performance issues related to  RocksDB before the application runs out of memory. Although, the memory used by RocksDB can be bounded in Kafka Streams, the bound is not a hard limitcrashes. Additionally, such monitoring are useful for analysing the cause of a crash. Currently, the metrics exposed by Kafka Streams include information about RocksDB instances run by an application (see KIP-471: Expose RocksDB Metrics in Kafka Streams for more details) but they do not provide any information about the memory or disk usage of the RocksDB instances. Moreover, the metrics in KIP-471 expose statistics collected in RocksDB. Collecting statistics in RocksDB may have an impact on performance. That is also the reason why the metrics in KIP-471 are on recording level DEBUG. This KIP proposes to add metrics that record the memory used by RocksDB to Kafka Streams .  that report properties that RocksDB exposes by default and consequently can be exposed on recording level INFO. The metrics in this KIP and KIP-471 complement each other.


Public Interfaces

Each added metric will be on store-level and have the following tags:

...

  • num-immutable-mem-table
  • cur-size-active-mem-table
  • cur-size-all-mem-tables
  • size-all-mem-tables
  • num-entries-active-mem-table
  • num-entries-imm-mem-tables
  • num-deletes-active-mem-table
  • num-deletes-imm-mem-tables
  • mem-table-flush-pending
  • num-running-flushes
  • compaction-pending
  • num-running-compactions
  • estimate-pending-compaction-bytes
  • total-sst-files-size
  • live-sst-files-size
  • num-live-versions
  • block-cache-capacity
  • block-cache-usage
  • block-cache-pinned-usage
  • estimate-num-keys
  • estimate-table-readers-mem
  • estimate-oldest-key-time
  • background-errors

The recording level for all metrics will be INFO.

The following change to the RocksDBConfigSetter interface is needed to get a cache that is possibly instantiated in an implementation of the config setter. Unfortunately, RocksDB's Java API does not offer a way to get the cache from the RocksDB instance. However, the metrics in this KIP need to know the cache that is used by the RocksDB instance to be able to aggregate the values correctly over the segments of a segmented state store.

...

languagejava
titleRocksDBConfigSetter.java

...

Proposed Changes

In this section, we will explain the meaning of the metrics listed in the previous section. To better understand the metrics, some basic concepts of RocksDB need to be explained first.

  • Memtable: Memtables are in-memory write buffers. Each new key-value pair is first written to a memtable and each read looks first into the memtable before it looks on disk. Once a memtable is full it becomes immutable and it is replaced by a new memtable. A background thread flushes a memtable asynchronously to disk. Additionally, memtables can also be flushed manually. RocksDB keeps in memory the currently active memtables, full but not yet flushed memtables, and flushed memtables that are kept around to maintain write history in memory.
  • Compaction: From time to time RocksDB needs to clean up the data it stores on disk and bring is LSM tree into a good shape (see https://github.com/facebook/rocksdb/wiki/Compaction). Compactions might block writes and flushes. Additionally, RocksDB offers different compaction algorithms with different properties. Thus, it is a good practise to monitor compactions in RocksDB.
  • SST files: SST files are the files in which RocksDB stores the data on disk. SST stands for Sorted Sequence Table.
  • Version: A version consists of all the live SST files at one point of time. Once a flush or compaction finishes, a new version is created because the list of live SST files has changed. An old version can be used by on-going read requests or compaction jobs. Old versions will eventually be garbage collected.
  • Cache: RocksDB caches data in memory for reads. By default, those caches contain only data blocks, i.e., uncompressed sequences of key-value pairs in sorted order. Therefore this cache is often referred to as block cache. However, users can configure RocksDB to also store index and filter blocks in the cache.

The names of the metrics are taken from the following list in the RocksDB repo (with "rocksdb." prefix ripped off):
https://github.com/facebook/rocksdb/blob/b9a4a10659969c71e6f6eab4e4bae8c36ede919f/include/rocksdb/db.h#L654-L686.

Those are public RocksDB properties. We decided to keep the RocksDB names to avoid a mapping that users need to look up or to memorize.

num-immutable-mem-table

Number of immutable memtables that have not yet been flushed. For segmented state stores, the sum of the number of immutable memtables over all segments is reported.

...

Approximate size of active, unflushed immutable, and pinned immutable memtables in bytes. Pinned immutable memtables are flushed memtables that are kept in memory to maintain write history in memory. For segmented state stores, the sum of sizes over all segments is reported.

...

Estimated total number of bytes a compaction needs to rewrite on disk to get all levels down to under target size. This . In other words, this metrics relates to the write amplification in level compaction. Thus, this metric is not valid for compactions other than level-based. For segmented state stores, the sum of the estimated total number of bytes over all segments is reported.

...

Estimated memory in bytes used for reading SST tables, excluding memory used in block cache (e.g., filter and index blocks). For segmented state stores, the sum of the estimated memory over all segments is reported.

estimate-oldest-key-time

Estimated oldest key timestamp in the RocksDB instance. This metric is currently only available for FIFO compaction with compaction_options_fifo.allow_compaction = falseThis metric records the memory used by iterators as well as filters and indices if the filters and indices are not maintained in the block cache. Basically this metric reports the memory used outside the block cache to read data. For segmented state stores, the minimum sum of the estimated oldest key timestamp memory over all segments is reported.

background-errors

Accumulated number of background errors. For segmented state stores, the sum of the number of background errors over all segments is reported.

Examples

  1. If users want to monitor the total memory usage of RocksDB, they should compute size-all-mem-tables + block-cache-usage + estimate-table-readers-mem. All of this memory is off-heap memory, i.e., it is not managed by the JVM. Note, that the monitored total memory usage is an estimation. Users can bound the total memory usage by configuring RocksDB as described in https://kafka.apache.org/25/documentation/streams/developer-guide/memory-mgmt.html#rocksdb
  2. With mem-table-flush-pending and num-running-flushes, users can monitor the flushing behavior of their state stores. Similarly, users can monitor the compaction behavior of their state stores with compaction-pending and num-running-compactions.
  3. To monitor the sizes of the LSM trees used in their state stores, users can monitor total-sst-files-size.

Performance Consideration

All the metrics will be implemented as gauges. That means, the metrics would not be recorded if the metrics reporting system used by the user does not query the metric. Hence, the number of metrics presented in this KIP should neither affect the performance of the RocksDB instances nor the performance of Kafka Streams if they are not queried. 

Compatibility, Deprecation, and Migration Plan

...