Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Renamed remote.log.metadata.custom.metadata.max.size to remote.log.metadata.custom.metadata.max.bytes as agreed in https://github.com/apache/kafka/pull/13984#discussion_r1282881131

Table of Contents

Status

Current state: Under Discussion Accepted

Discussion thread: here

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  1. A RemoteStorageManager makes the decision how and where to store the segment. Imagine a situation where it needs to apply a certain load balancing strategy across buckets on AWS S3 or Google Cloud Storage or storage accounts on Azure Blob Storage. If the strategy is deterministic, its inputs (e.g. the number of buckets to balance across) may change with time. It can also be non-deterministic, i.e. randomized. In this case, storing the name of the bucket or another identifier along with other remote log segment metadata is essential for the segment to be retrievable later.
  2. In some cases, it’s necessary to know how much of the remote storage is consumed by a topic or a partition. RemoteLogSegmentMetadata has the segmentSizeInBytes field. However, this field includes only the segment itself without indices or any other files that may be uploaded together with the segment. Besides, segmentSizeInBytes represents the size of the raw file size on the local disk, which doesn’t take into account potential transformations (e.g. compression, encryption) a RemoteStorageManager may apply to the data.

Considering that remote segments are already pretty coupled with the remote storage plugin that wrote them, adding the custom metadata won't introduce new coupling.

Proposed Changes

This KIP proposes to add the possibility for RemoteStorageManager.copyLogSegmentData method to return optional custom metadata after a segment has been uploaded. This piece will be stored and propagated along with the standard metadata. The format of the customer custom metadata will be a map from String to byte[] (as in record headers, for example).opaque byte array to the rest of the system. It should be serialized, deserialized, and interpreted by the remote storage plugin only. The kafka-clients library, which is distributed separately from Kafka and can be used as a plugin dependency, has convenient tools for binary serde.

Allowing remote storage plugins to write custom metadata of arbitrary size is not good for the stability of the cluster. The broker should have a configurable limit for their size. If a piece of custom metadata exceeds the limit, the execution of RLMTask.copyLogSegmentsToRemote should be interrupted with an error message. The default value for the limit should be 128 B, which is small enough to not be disruptive.

It's proposed to return the custom metadata from RemoteStorageManager.copyLogSegmentData. Hence, the custom metadata size in fact will be checked after the segment data is copied to the remote storage. In case the size limit is exceeded, one best effort attempt will be made to delete the data from the remote storage. Considering the situation with exceeding the limit should be very rare and abnormal and will require an operator actions, it's proposed to not introduce a complex cleanup mechanism or change the interface of RemoteStorageManager to e.g. precompute custom metadata and return to the broker side for inspection. (Put this also into Rejected Alternatives.)

See Public Interfaces for the details.

Public Interfaces

RemoteLogSegmentMetadata.CustomMetadata

Introduce a class for storing custom metadata.

Code Block
languagejava
titleRemoteLogSegmentMetadata.CustomMetadata
public class RemoteLogSegmentMetadata extends RemoteLogMetadata {

    public static class CustomMetadata {
        private final byte[] value;

        public CustomMetadata(byte[] value) {
            this.value = value;
        }

        public byte[] value() {
            return value;
        }
    }

...

RemoteStorageManager.copyLogSegmentData

This method will return Optional<Map<String, byte[]>>Optional<CustomMetadata> instead of void. The documentation will be adjusted accordingly.

Code Block
languagejava
titleRemoteStorageManager.copyLogSegmentData
/**
 * ...
 * @return Custom metadata to be added to the segment metadata after copying.
 * ...
 */
Optional<Map<String, byte[]>>Optional<CustomMetadata> copyLogSegmentData(
  RemoteLogSegmentMetadata remoteLogSegmentMetadata,
  LogSegmentData logSegmentData
) throws RemoteStorageException;

...

Code Block
languagejava
titlecustomMetadata
/**
* @return Optional map of custom metadata for this segment.
*/
public Optional<Map<String, byte[]>>Optional<CustomMetadata> customMetadata() {
   return customMetadata;
}

...

Code Block
languagejava
titlecreateWithCustomMetadata
public RemoteLogSegmentMetadata createWithCustomMetadata(Map<String, byte[]> CustomMetadata customMetadata) {
   return new RemoteLogSegmentMetadata(remoteLogSegmentId, ..., customMetadata);
}

...

Code Block
languagetext
titleRemoteLogSegmentMetadataRecord
{
  "name": "CustomMetadata",
  "type": "[]CustomMetadataEntrybytes",
 "versions": "0+",
 "about"default": "Custom metadata.null",
 "fields": [
   {
     "name": "Key",
     "type": "string",
     "versions": "0+",
     "mapKey": true
   },
   {
     "name"nullableVersions": "Data0+",
     "typeabout": "bytes",
     "versions": "0+"
   }
 ]
}Custom metadata."
}

RemoteLogSegmentMetadataUpdateRecord

Same as for RemoteLogSegmentMetadataRecord.

RemoteLogSegmentMetadataSnapshotRecord

Same as for RemoteLogSegmentMetadataRecord.

Configuration Keys

Key NameDescriptionValid ValuesDefault Value
remote.log.metadata.custom.metadata.max.bytesThe maximum size of custom metadata in bytes that the broker should accept from a remote storage plugin. If custom  metadata exceeds this limit, the updated segment metadata will not be stored, the copied data will be attempted to delete, and the remote copying task for this topic-partition will stop with an error.0..Integer.MAX_VALUE128

Compatibility, Deprecation, and Migration Plan

...

The changes will be tested on the unit level, the existing unit tests will be adjusted.

Rejected Alternatives

...

  1. Introduce a separate storage for custom metadata. This will not reduce the coupling or be better in any other visible way, but will make the solution more complex (e.g. will require a separate cache and a correlation mechanism to match custom and regular metadata) and more difficult to operate.
  2. Change the interface of RemoteStorageManager to introduce a separate method for calculating the custom segment metadata before the attempt to copy it to the remote storage. This would allow to do the check the custom metadata size before an attempt to copy the segment. However, the situation with exceeding the custom metadata size limit is abnormal, should be very rare, and requires an operator intervention anyway, it's decided to not make the solution unnecessarily complex.