You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 15 Next »

Status

Current state: Accepted

Discussion thread: http://mail-archives.apache.org/mod_mbox/kafka-dev/202005.mbox/%3CCADR0NwzJBJa6WihnpmGj0R%2BYPVrojq4Kg_hOArNEytHAG-tZAQ%40mail.gmail.com%3E

JIRA: Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Monitoring RocksDB instances run in a Kafka Streams application allows to react to increased memory and disk demands of RocksDB as well as to other performance issues related to  RocksDB before the application crashes. Additionally, such monitoring are useful for analysing the cause of a crash. Currently, the metrics exposed by Kafka Streams include information about RocksDB instances run by an application (see KIP-471: Expose RocksDB Metrics in Kafka Streams for more details) but they do not provide any information about memory or disk usage of the RocksDB instances. Moreover, the metrics in KIP-471 expose statistics collected in RocksDB. Collecting statistics in RocksDB may have an impact on performance. That is also the reason why the metrics in KIP-471 are on recording level DEBUG. This KIP proposes to add metrics to Kafka Streams that report properties that RocksDB exposes by default and consequently can be exposed on recording level INFO. The metrics in this KIP and KIP-471 complement each other.


Public Interfaces

Each added metric will be on store-level and have the following tags:

  • type = stream-state-metrics
  • thread-id = [thread ID]
  • task-id = [task ID]
  • rocksdb-state-id = [store ID]    for key-value stores
  • rocksdb-session-state-id = [store ID]    for session stores
  • rocksdb-window-state-id = [store ID]    for window stores  

The following metrics will be exposed in the Kafka Streams' metrics

  • num-immutable-mem-table
  • cur-size-active-mem-table
  • cur-size-all-mem-tables
  • size-all-mem-tables
  • num-entries-active-mem-table
  • num-entries-imm-mem-tables
  • num-deletes-active-mem-table
  • num-deletes-imm-mem-tables
  • mem-table-flush-pending
  • num-running-flushes
  • compaction-pending
  • num-running-compactions
  • estimate-pending-compaction-bytes
  • total-sst-files-size
  • live-sst-files-size
  • num-live-versions
  • block-cache-capacity
  • block-cache-usage
  • block-cache-pinned-usage
  • estimate-num-keys
  • estimate-table-readers-mem
  • estimate-oldest-key-time
  • background-errors

The recording level for all metrics will be INFO.

The following change to the RocksDBConfigSetter interface is needed to get a cache that is possibly instantiated in an implementation of the config setter. Unfortunately, RocksDB's Java API does not offer a way to get the cache from the RocksDB instance. However, the metrics in this KIP need to know the cache that is used by the RocksDB instance to be able to aggregate the values correctly over the segments of a segmented state store.

RocksDBConfigSetter.java
public interface RocksDBConfigSetter {

    void setConfig(final String storeName, final Options options, final Map<String, Object> configs);

	/**
     * Returns the cache possibly instantiated in the instance 
     * that implements this interface.
     *
     * If no cache is instantiated, the default behavior does not 
     * need to be overridden.
     * 
     * This method is needed because RocksDB's Java API does not offer
     * any mean to get a cache that is passed to the RocksDB instance 
     * by the config setter.
     */	
    default Cache cache() {        // new method
        return null;
    }

    default void close(final String storeName, final Options options) {
        LOG.warn("The default close will be removed in 3.0.0 -- you should overwrite it if you have implemented RocksDBConfigSetter");
    }
}


Proposed Changes

In this section, we will explain the meaning of the metrics listed in the previous section. To better understand the metrics, some basic concepts of RocksDB need to be explained first.

  • Memtable: Memtables are in-memory write buffers. Each new key-value pair is first written to a memtable and each read looks first into the memtable before it looks on disk. Once a memtable is full it becomes immutable and it is replaced by a new memtable. A background thread flushes a memtable asynchronously to disk. Additionally, memtables can also be flushed manually. RocksDB keeps in memory the currently active memtables, full but not yet flushed memtables, and flushed memtables that are kept around to maintain write history in memory.
  • Compaction: From time to time RocksDB needs to clean up the data it stores on disk and bring is LSM tree into a good shape (see https://github.com/facebook/rocksdb/wiki/Compaction). Compactions might block writes and flushes. Additionally, RocksDB offers different compaction algorithms with different properties. Thus, it is a good practise to monitor compactions in RocksDB.
  • SST files: SST files are the files in which RocksDB stores the data on disk. SST stands for Sorted Sequence Table.
  • Version: A version consists of all the live SST files at one point of time. Once a flush or compaction finishes, a new version is created because the list of live SST files has changed. An old version can be used by on-going read requests or compaction jobs. Old versions will eventually be garbage collected.
  • Cache: RocksDB caches data in memory for reads. By default, those caches contain only data blocks, i.e., uncompressed sequences of key-value pairs in sorted order. Therefore this cache is often referred to as block cache. However, users can configure RocksDB to also store index and filter blocks in the cache.

num-immutable-mem-table

Number of immutable memtables that have not yet been flushed. For segmented state stores, the sum of the number of immutable memtables over all segments is reported.

cur-size-active-mem-table

Approximate size of active memtable in bytes. For segmented state stores, the sum of the sizes over all segments is reported.

cur-size-all-mem-tables

Approximate size of active and unflushed immutable memtable in bytes. For segmented state stores, the sum of sizes over all segments is reported.

size-all-mem-tables

Approximate size of active, unflushed immutable, and pinned immutable memtables in bytes. Pinned immutable memtables are flushed memtables that are kept in memory to maintain write history in memory. For segmented state stores, the sum of sizes over all segments is reported.

num-entries-active-mem-table

Total number of entries in the active memtable. For segmented state stores, the sum of number of entries over all segments is reported.

num-entries-imm-mem-tables

Total number of entries in the unflushed immutable memtables. For segmented state stores, the sum of number of entries over all segments is reported.

num-deletes-active-mem-table

Total number of delete entries in the active memtable. For segmented state stores, the sum of number of deletes over all segments is reported.

num-deletes-imm-mem-tables

Total number of delete entries in the unflushed immutable memtables. For segmented state stores, the sum of number of deletes over all segments is reported.

mem-table-flush-pending

This metric returns 1 if a memtable flush is pending; otherwhise it returns 0. For segmented state stores, the sum of pending flushes over all segments is reported.

num-running-flushes

Number of currently running flushes. For segmented state stores, the sum of running flushes over all segments is reported.

compaction-pending

This metric 1 if at least one compaction is pending; otherwise, the metric reports 0. For segmented state stores, the sum of ones and zeros over all segments is reported.

num-running-compactions

Number of currently running compactions. For segmented state stores, the sum of the number of currently running compactions over all segments is reported.

estimate-pending-compaction-bytes

Estimated total number of bytes a compaction needs to rewrite on disk to get all levels down to under target size. In other words, this metrics relates to the write amplification in level compaction. Thus, this metric is not valid for compactions other than level-based. For segmented state stores, the sum of the estimated total number of bytes over all segments is reported.

total-sst-files-size

Total size in bytes of all SST files. For segmented state stores, the sum of the sizes of SST files over all segments is reported.

live-sst-files-size

Total size in bytes of all SST files that belong to the latest LSM tree. For segmented state stores, the sum of the sizes of SST files over all segments is reported.

num-live-versions

Number of live versions. More live versions often mean more SST files are held from being deleted, by iterators or unfinished compactions. For segmented state stores, the sum of the number of versions over all segments is reported.

block-cache-capacity

Block cache capacity. For segmented state stores, the sum of the cache capacity over all segments is reported, if separate caches are used, otherwise, if only one cache is used, the cache capacity of any segment is reported.

block-cache-usage

Memory size for the entries residing in block cache. For segmented state stores, the sum of the cache capacity over all segments is reported, if separate caches are used, otherwise, if only one cache is used, the cache capacity of any segment is reported.

block-cache-pinned-usage

Memory size for the entries being pinned. For segmented state stores, the sum of the cache capacity over all segments is reported, if separate caches are used, otherwise, if only one cache is used, the cache capacity of any segment is reported.

estimate-num-keys

Estimated number of total keys in the active and unflushed immutable memtables and storage. For segmented state stores, the sum of the estimated number of keys over all segments is reported.

estimate-table-readers-mem

Estimated memory in bytes used for reading SST tables, excluding memory used in block cache (e.g., filter and index blocks). This metric records the memory used by iterators as well as filters and indices if the filters and indices are not maintained in the block cache. Basically this metric reports the memory used outside the block cache to read data. For segmented state stores, the sum of the estimated memory over all segments is reported.

estimate-oldest-key-time

Estimated oldest key timestamp in the RocksDB instance. This metric is currently only available for FIFO compaction with compaction_options_fifo.allow_compaction = false. For segmented state stores, the minimum of the estimated oldest key timestamp over all segments is reported.

background-errors

Accumulated number of background errors. For segmented state stores, the sum of the number of background errors over all segments is reported.

Performance Consideration

All the metrics will be implemented as gauges. That means, the metrics would not be recorded if the metrics reporting system used by the user does not query the metric. Hence, the number of metrics presented in this KIP should neither affect the performance of the RocksDB instances nor the performance of Kafka Streams if they are not queried.     

Compatibility, Deprecation, and Migration Plan

Since metrics are only added and no other metrics are modified, this KIP should not

  • affect backward-compatibility
  • deprecate public interfaces
  • need a migration plan other than adding the new metrics to its own monitoring component

Rejected Alternatives

Introduce configuration in Kafka Streams to name RocksDB properties to expose

Since all of the above metrics can be exposed as gauges, there should not be too much performance overhead because recording is only triggered when the metric is actually queried. We thought that the maintenance costs of a configuration would be higher than just exposing this set of RocksDB properties. 


  • No labels