Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion Accepted

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRAhere 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

As per the Tiered Storage feature introduced in KIP-405, users can configure the retention of remote tier based on time, by size, or both. The work of computing the log segments to be deleted based on the retention config is owned by RemoteLogManager (RLM).

To compute remote segments eligible for deletion based on retention by size config, RLM needs to compute the total_remote_log_size i.e. the total size of logs available in the remote tier for that topic-partition. RLM could use uses the RemoteLogMetadataManager.listRemoteLogSegments() to fetch metadata for all the remote segments and then aggregate the segment sizes by using RemoteLogSegmentMetadata.segmentSizeInBytes()to find the total log size stored in the remote tier.

The above method involves iterating through all metadata of all the segments i.e. O(num_remote_segments) on each execution of RLM thread. Since the main feature of tiered storage is storing a large amount of data, we expect num_remote_segments to be large and a frequent linear scan (i.e. listing all segment metadata)could be expensive (depending on /slower because of the underlying storage used by RemoteLogMetadataManager).
Segment offloads and segment deletions are run together in the same task and a fixed size thread pool is shared among all topic-partitions. A slow logic for calculation of total_log_size could result in the loss of availability as demonstrated in the following scenario:

  1. Calculation of total_size is slow and the threads in the thread pool are busy with segment deletions
  2. Segment offloads are delayed (since they run together with deletions)
  3. Local disk fills up, since local deletion requires the segment to be offloaded
  4. If local disk is completely full, Kafka fails

. This slowness could lead to slower rate of uploading to remote tier.

This KIP addresses the problem by proposing a new API in RemoteLogMetadataManager(RLMM) to calculate the total size and delegates the responsibility of calculation to the specific RLMM’s implementation. This API removes the requirement to list all segment metadata for calculation of total_size.

(Note: for the case of local storage tier, all log segments are stored in-memory and size is calculated by iterating through the in-memory loaded segments. For remote-tier, we anticipate the number of segments to be significantly larger than local tier segments which might not fit into in-memory cache).

...

Code Block
languagescala
themeConfluence
linenumberstrue
/**
* Returns total size of the log for the given leader epoch in remote storage.
*
* @param topicPartition topic partition for which size needs to be calculated.
* @param leaderEpoch Size will only include segments belonging to this epoch.
* @return Total size of the log stored in remote storage in bytes.
*/
Longlong getRemoteLogSizeremoteLogSize(TopicPartition topicPartition, int leaderEpoch);

...

The following new metrics will be added. RemoteLogSizeBytes will be updated using the values obtained from getRemoteLogSize remoteLogSize API call on every attempt to compute remote segments eligible for deletion by the RemoteLogManager.

nameDescription
kafka.log.remote:type=
RemoteLogManager
BrokerTopicMetrics, name=RemoteLogSizeBytes, topic=([-.w]+)Provides the total size of log in bytes stored on the remote tier.

Proposed Changes

KIP-405 proposes a public interface RemoteLogMetadataManager . Users can plugin their own implementation if they intend to use another system to store remote log segment metadata. KIP-405 also provides a default implementation for RLMM called TopicBasedRemoteLogMetadataManager  which uses topics.

This KIP proposes to delegate the responsibility of calculation of total size of log in remote tier to the specific implementation for RemoteLogMetadataManager To this end, this KIP proposes addition of a new API getRemoteLogSize remoteLogSize to the RLMM interface. RLMM implementations would implement this API and may choose to optimize it based on their internal data structure.

This API would also be useful for other cases such as exposing the amount of data in remote tier for a particular topic partition.

After the implementation of this method, RemoteLogManager would compute the size of log as follows:

Code Block
languagescala
themeConfluence
linenumberstrue
def calculateTotalSizecalculateRemoteTierSize() {
  // Find the leader epochs from leader epoch cache.
  val validLeaderEpochs = fromLeaderEpochCacheToEpochs(log)
  // For each leader epoch in current lineage, calculate size of log
  val totalSizeremoteLogSizeBytes = validLeaderEpochs.map(epoch => rlmm.getRemoteLogSizeremoteLogSize(tp, epoch)).sum
  remoteLogSizeBytes
}// the new  API would be used for size based retention as:

val totalLogSize = remoteLogSizeBytes + log.localOnlyLogSegmentsSize

var remainingSize = if (shouldDeleteBySize) totalLogSize - retentionSize else 0

val  totalSize
segmentsIterator = remoteLogMetadataManager.listRemoteLogSegment

while (remainingSize > 0 && segmentsIterator.hasNext) { // delete segments }

Code changes

  1. Add the new API to RemoteLogMetadataManager
  2. Implement the new API at TopicBasedRemoteLogMetadataManager (with unit tests)
  3. Add the new metric when code for RemoteLogManager has been merged.

...

This KIP proposes to add a new metric RemoteLogSizeBytes which tracks the size of data stored in remote tier for a topic partition.
This metric will be useful both for the admin and the user to monitor in real time the volume of the more tiered data. It would be used in future to add the size of remote tier in response to DescribeLogDirs API call. RemoteLogSizeBytes will be updated using the values obtained from getRemoteLogSize API call on every attempt to compute remote segments eligible for deletion by the RemoteLogManagerremoteLogSize API call each time we run the log retention check (that is, log.retention.check.interval.ms) and when user explicitly call remoteLogSize().

Compatibility, Deprecation, and Migration Plan

...

  1. RLMM implementation has an additional responsibility to list metadata in decreasing order of offsets. This adds an additional requirement for the underlying implementation of RLMM to perform this sort which might not be optimised when dealing with a large number of segments.
  2. Metric to track the total size of remote tier will still need an implementation of the new API getRemoteLogSize`remoteLogSize()`
  3. We would need to iterate through the list of segments which are not eligible to be deleted. This could be an expensive operation if we do it on every deletion.

...

This approach advocates for maintaining the size of log in remote tier in-memory and updating it every time there is a copySegmentToRemote or a deleteSegment event. The in-memory value needs to be initialised once by performing a full scan of all log segments, typically at broker startup.

Pros: Constant time calculation of size since it is stored in-memory.
Cons: Every time a broker starts-up, it will scan through all the segments in the remote tier to initialise the in-memory value. This would increase the broker start-up timebootstrap time for the remote storage thread pool before the first eligible segment is archived.

Alternative 4: Store the cumulative size of remote tier log at RemoteLogManager

...