Status
Current state: Under Discussion
Discussion thread: here
JIRA: here [Change the link from KAFKA-1 to your own ticket]
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
KIP-405: Kafka Tiered Storage introduced the RemoteLogSegmentMetadata
class which describes a log segment uploaded to the remote storage. It contains useful bits of metadata such as the segment ID, start and end offsets, etc. It is (will be) created by the broker side and implementations of RemoteStorageManager
--i.e. remote storage plugins--don’t have the possibility to add any custom bit of metadata to be stored and propagated along with the standard ones. However, this might be useful in certain circumstances, for example:
- A
RemoteStorageManager
makes the decision how and where to store the segment. Imagine a situation where it needs to apply a certain load balancing strategy across buckets on AWS S3 or Google Cloud Storage or storage accounts on Azure Blob Storage. If the strategy is deterministic, its inputs (e.g. the number of buckets to balance across) may change with time. It can also be non-deterministic, i.e. randomized. In this case, storing the name of the bucket or another identifier along with other remote log segment metadata is essential for the segment to be retrievable later. - In some cases, it’s necessary to know how much of the remote storage is consumed by a topic or a partition.
RemoteLogSegmentMetadata
has thesegmentSizeInBytes
field. However, this field includes only the segment itself without indices or any other files that may be uploaded together with the segment. Besides,segmentSizeInBytes
represents the size of the raw file size on the local disk, which doesn’t take into account potential transformations (e.g. compression, encryption) aRemoteStorageManager
may apply to the data.
Proposed Changes
This KIP proposes to add the possibility for RemoteStorageManager.copyLogSegmentData
method to return optional custom metadata after a segment has been uploaded. This piece will be stored and propagated along with the standard metadata. The format of the customer metadata will be a map from String
to byte[]
(as in record headers, for example).
See Public Interfaces for the details.
Public Interfaces
RemoteStorageManager.copyLogSegmentData
This method will return Optional<Map<String, byte[]>>
instead of void
. The documentation will be adjusted accordingly.
/** * ... * @return Custom metadata to be added to the segment metadata after copying. * ... */ Optional<Map<String, byte[]>> copyLogSegmentData( RemoteLogSegmentMetadata remoteLogSegmentMetadata, LogSegmentData logSegmentData ) throws RemoteStorageException;
RemoteLogSegmentMetadata
class
This class will have a new method:
/** * @return Optional map of custom metadata for this segment. */ public Optional<Map<String, byte[]>> customMetadata() { return customMetadata; }
The corresponding private field will be created, the constructors and the documentation will be adjusted accordingly.
There should be a possibility to create an instance with this field being Optional.empty()
(primarily, for the initial passing to RemoteStorageManager.copyLogSegmentData
).
The method createWithCustomMetadata
will be added for creating a new instance from an existing one with the provided custom metadata:
public RemoteLogSegmentMetadata createWithCustomMetadata(Map<String, byte[]> customMetadata) { return new RemoteLogSegmentMetadata(remoteLogSegmentId, ..., customMetadata); }
RemoteLogSegmentMetadataSnapshot
class
The same field and constructor changes as in the RemoteLogSegmentMetadata
class.
RemoteLogSegmentMetadataRecord
This record definition will have a new field:
{ "name": "CustomMetadata", "type": "[]CustomMetadataEntry", "versions": "0+", "about": "Custom metadata.", "fields": [ { "name": "Key", "type": "string", "versions": "0+", "mapKey": true }, { "name": "Data", "type": "bytes", "versions": "0+" } ] }
RemoteLogSegmentMetadataSnapshotRecord
Same as for RemoteLogSegmentMetadataRecord
.
Compatibility, Deprecation, and Migration Plan
Since the tiered storage functionality is not implemented yet and the related interfaces are evolving, it’s proposed to not preserve backward compatibility.
No special migration process or tool is needed.
Test Plan
The changes will be tested on the unit level, the existing unit tests will be adjusted.
Rejected Alternatives
None.