Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: update the API name to remoteLogSize

...

Code Block
languagescala
themeConfluence
linenumberstrue
/**
* Returns total size of the log for the given leader epoch in remote storage.
*
* @param topicPartition topic partition for which size needs to be calculated.
* @param leaderEpoch Size will only include segments belonging to this epoch.
* @return Total size of the log stored in remote storage in bytes.
*/
Long getRemoteLogSizeremoteLogSize(TopicPartition topicPartition, int leaderEpoch);

...

The following new metrics will be added. RemoteLogSizeBytes will be updated using the values obtained from getRemoteLogSize remoteLogSize API call on every attempt to compute remote segments eligible for deletion by the RemoteLogManager.

...

KIP-405 proposes a public interface RemoteLogMetadataManager . Users can plugin their own implementation if they intend to use another system to store remote log segment metadata. KIP-405 also provides a default implementation for RLMM called TopicBasedRemoteLogMetadataManager  which uses topics.

This KIP proposes to delegate the responsibility of calculation of total size of log in remote tier to the specific implementation for RemoteLogMetadataManager To this end, this KIP proposes addition of a new API getRemoteLogSize remoteLogSize to the RLMM interface. RLMM implementations would implement this API and may choose to optimize it based on their internal data structure.

This API would also be useful for other cases such as exposing the amount of data in remote tier for a particular topic partition.

After the implementation of this method, RemoteLogManager would compute the size of log as follows:

Code Block
languagescala
themeConfluence
linenumberstrue
def calculateRemoteTierSize() {
  // Find the leader epochs from leader epoch cache.
  val validLeaderEpochs = fromLeaderEpochCacheToEpochs(log)
  // For each leader epoch in current lineage, calculate size of log
  val remoteLogSizeBytes = validLeaderEpochs.map(epoch => rlmm.getRemoteLogSizeremoteLogSize(epoch)).sum
  remoteLogSizeBytes
}// the new  API would be used for size based retention as:

val totalLogSize = remoteLogSizeBytes + log.localOnlyLogSegmentsSize

var remainingSize = if (shouldDeleteBySize) totalLogSize - retentionSize else 0

val segmentsIterator = remoteLogMetadataManager.listRemoteLogSegment

while (remainingSize > 0 && segmentsIterator.hasNext) { // delete segments }

...

This KIP proposes to add a new metric RemoteLogSizeBytes which tracks the size of data stored in remote tier for a topic partition.
This metric will be useful both for the admin and the user to monitor in real time the volume of the more tiered data. It would be used in future to add the size of remote tier in response to DescribeLogDirs API call. RemoteLogSizeBytes will be updated using the values obtained from getRemoteLogSize remoteLogSize API call each time we run the log retention check (that is, log.retention.check.interval.ms) and when user explicitly call getRemoteLogSizeremoteLogSize().

Compatibility, Deprecation, and Migration Plan

...

  1. RLMM implementation has an additional responsibility to list metadata in decreasing order of offsets. This adds an additional requirement for the underlying implementation of RLMM to perform this sort which might not be optimised when dealing with a large number of segments.
  2. Metric to track the total size of remote tier will still need an implementation of the new API getRemoteLogSize`remoteLogSize()`
  3. We would need to iterate through the list of segments which are not eligible to be deleted. This could be an expensive operation if we do it on every deletion.

...