Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion Accepted

Discussion thread: here

JIRAhere 

...

As per the Tiered Storage feature introduced in KIP-405, users can configure the retention of remote tier based on time, by size, or both. The work of computing the log segments to be deleted based on the retention config is owned by RemoteLogManager (RLM).

To compute remote segments eligible for deletion based on retention by size config, RLM needs to compute the total_remote_log_size i.e. the total size of logs available in the remote tier for that topic-partition. RLM uses the RemoteLogMetadataManager.listRemoteLogSegments() to fetch metadata for all the remote segments and then aggregate the segment sizes by using RemoteLogSegmentMetadata.segmentSizeInBytes()to find the total log size stored in the remote tier.

The above method involves iterating through all metadata of all the segments i.e. O(num_remote_segments) on each execution of RLM thread. Since the main feature of tiered storage is storing a large amount of data, we expect num_remote_segments to be large and a frequent linear scan (i.e. listing all segment metadata)could be expensive/slower because of the underlying storage used by RemoteLogMetadataManager. This slowness to list all segment metadata could result in the loss of availability as demonstrated in the following scenario:

  1. Calculation of total_size is slow and the threads in the thread pool are busy with segment deletions
  2. Segment offloads are delayed (since they run together with deletions)
  3. Local disk fills up, since local deletion requires the segment to be offloaded
  4. If local disk is completely full, Kafka fails

could lead to slower rate of uploading to remote tier.

This KIP addresses the problem by proposing a new API in RemoteLogMetadataManager(RLMM) to calculate the total size and delegates the responsibility of calculation to the specific RLMM’s implementation. This API removes the requirement to list all segment metadata for calculation of total_size.

(Note: for the case of local storage tier, all log segments are stored in-memory and size is calculated by iterating through the in-memory loaded segments. For remote-tier, we anticipate the number of segments to be significantly larger than local tier segments which might not fit into in-memory cache).

...

Code Block
languagescala
themeConfluence
linenumberstrue
/**
* Returns total size of the log for the given leader epoch in remote storage.
*
* @param topicPartition topic partition for which size needs to be calculated.
* @param leaderEpoch Size will only include segments belonging to this epoch.
* @return Total size of the log stored in remote storage in bytes.
*/
Longlong remoteLogSize(TopicPartition topicPartition, int leaderEpoch);

...