THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.server.log.remote.storage; ... /** * RemoteStorageManager provides the lifecycle of remote log segments that includes copy, fetch, and delete from remote * storage. * <p> * Each upload or copy of a segment is initiated with {@link RemoteLogSegmentMetadata} containing {@link RemoteLogSegmentId} * which is universally unique even for the same topic partition and offsets. * <p> * RemoteLogSegmentMetadata is stored in {@link RemoteLogMetadataManager} before and after copy/delete operations on * RemoteStorageManager with the respective {@link RemoteLogSegmentMetadata.StateRemoteLogSegmentState}. {@link RemoteLogMetadataManager} is * responsible for storing and fetching metadata about the remote log segments in a strongly consistent manner. * This allows RemoteStorageManager to store segments even in eventually consistent manner as the metadata is already * stored in a consistent store. * <p> * All these APIs are still evolving. */ @InterfaceStability.Unstable public interface RemoteStorageManager extends Configurable, Closeable { /** * Type of the index file. */ enum IndexType { /** * Represents offset index. */ Offset, /** * Represents timestamp index. */ Timestamp, /** * Represents producer snapshot index. */ ProducerSnapshot, /** * Represents transaction index. */ Transaction, /** * Represents leader epoch index. */ LeaderEpoch, } /** * Copies LogSegmentData provided for the given {@param remoteLogSegmentMetadata}. * <p> * Invoker of this API should always send a unique id as part of {@link RemoteLogSegmentMetadata#remoteLogSegmentId()#id()} * even when it retries to invoke this method for the same log segment data. * * @param remoteLogSegmentMetadata metadata about the remote log segment. * @param logSegmentData data to be copied to tiered storage. * @throws RemoteStorageException if there are any errors in storing the data of the segment. */ void copyLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata, LogSegmentData logSegmentData) throws RemoteStorageException; /** * Returns the remote log segment data file/object as InputStream for the given RemoteLogSegmentMetadata starting * from the given startPosition. The stream will end at the end of the remote log segment data file/object. * * @param remoteLogSegmentMetadata metadata about the remote log segment. * @param startPosition start position of log segment to be read, inclusive. * @return input stream of the requested log segment data. * @throws RemoteStorageException if there are any errors while fetching the desired segment. */ InputStream fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata, int startPosition) throws RemoteStorageException; /** * Returns the remote log segment data file/object as InputStream for the given RemoteLogSegmentMetadata starting * from the given startPosition. The stream will end at the smaller of endPosition and the end of the remote log * segment data file/object. * * @param remoteLogSegmentMetadata metadata about the remote log segment. * @param startPosition start position of log segment to be read, inclusive. * @param endPosition end position of log segment to be read, inclusive. * @return input stream of the requested log segment data. * @throws RemoteStorageException if there are any errors while fetching the desired segment. */ InputStream fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata, int startPosition, int endPosition) throws RemoteStorageException; /** * Returns the index for the respective log segment of {@link RemoteLogSegmentMetadata}. * * @param remoteLogSegmentMetadata metadata about the remote log segment. * @param indexType type of the index to be fetched for the segment. * @return input stream of the requested index. * @throws RemoteStorageException if there are any errors while fetching the index. */ default InputStream fetchIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata, IndexType indexType) throws RemoteStorageException; /** * Deletes the resources associated with the given {@param remoteLogSegmentMetadata}. Deletion is considered as * successful if this call returns successfully without any errors. It will throw {@link RemoteStorageException} if * there are any errors in deleting the file. * <p> * {@link RemoteResourceNotFoundException} is thrown when there are no resources associated with the given * {@param remoteLogSegmentMetadata}. * * @param remoteLogSegmentMetadata metadata about the remote log segment to be deleted. * @throws RemoteResourceNotFoundException if the requested resource is not found * @throws RemoteStorageException if there are any storage related errors occurred. */ void deleteLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException; } package org.apache.kafka.common; ... public class TopicIdPartition { private final UUID topicId; private final TopicPartition topicPartition; public TopicIdPartition(UUID topicId, TopicPartition topicPartition) { Objects.requireNonNull(topicId, "topicId can not be null"); Objects.requireNonNull(topicPartition, "topicPartition can not be null"); this.topicId = topicId; this.topicPartition = topicPartition; } public UUID topicId() { return topicId; } public TopicPartition topicPartition() { return topicPartition; } ... } package org.apache.kafka.server.log.remote.storage; ... /** * This represents a universally unique identifier associated to a topic partition's log segment. This will be * regenerated for every attempt of copying a specific log segment in {@link RemoteStorageManager#copyLogSegment(RemoteLogSegmentMetadata, LogSegmentData)}. */ public class RemoteLogSegmentId implements Comparable<RemoteLogSegmentId>, Serializable { private static final long serialVersionUID = 1L; private final TopicIdPartition topicIdPartition; private final UUID id; public RemoteLogSegmentId(TopicIdPartition topicIdPartition, UUID id) { this.topicIdPartition = requireNonNull(topicIdPartition); this.id = requireNonNull(id); } /** * Returns TopicIdPartition of this remote log segment. * * @return */ public TopicIdPartition topicIdPartition() { return topicIdPartition; } /** * Returns Universally Unique Id of this remote log segment. * * @return */ public UUID id() { return id; } ... } package org.apache.kafka.server.log.remote.storage; ... /** * It describes the metadata about the log segment in the remote storage. */ public class RemoteLogSegmentMetadata implements Serializable { private static final long serialVersionUID = 1L; /** * Universally unique remote log segment id. */ private final RemoteLogSegmentId remoteLogSegmentId; /** * Start offset of this segment. */ private final long startOffset; /** * End offset of this segment. */ private final long endOffset; /** * Leader epoch of the broker. */ private final int leaderEpoch; /** * Maximum timestamp in the segment */ private final long maxTimestamp; /** * Epoch time at which the respective {@link #state} is set. */ private final long eventTimestamp; /** * LeaderEpoch vs offset for messages with in this segment. */ private final Map<Int, Long> segmentLeaderEpochs; /** * Size of the segment in bytes. */ private final int segmentSizeInBytes; /** * It indicates the state in which the action is executed on this segment. */ private final RemoteLogSegmentState state; /** * @param remoteLogSegmentId Universally unique remote log segment id. * @param startOffset Start offset of this segment. * @param endOffset End offset of this segment. * @param maxTimestamp Maximum timestamp in this segment * @param leaderEpoch Leader epoch of the broker. * @param eventTimestamp Epoch time at which the remote log segment is copied to the remote tier storage. * @param segmentSizeInBytes Size of this segment in bytes. * @param state State of the respective segment of remoteLogSegmentId. * @param segmentLeaderEpochs leader epochs occurred with in this segment */ public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, long startOffset, long endOffset, long maxTimestamp, int leaderEpoch, long eventTimestamp, int segmentSizeInBytes, RemoteLogSegmentState state, Map<Int, Long> segmentLeaderEpochs) { this.remoteLogSegmentId = remoteLogSegmentId; this.startOffset = startOffset; this.endOffset = endOffset; this.leaderEpoch = leaderEpoch; this.maxTimestamp = maxTimestamp; this.eventTimestamp = eventTimestamp; this.segmentLeaderEpochs = segmentLeaderEpochs; this.state = state; this.segmentSizeInBytes = segmentSizeInBytes; } /** * @return unique id of this segment. */ public RemoteLogSegmentId remoteLogSegmentId() { return remoteLogSegmentId; } /** * @return Start offset of this segment(inclusive). */ public long startOffset() { return startOffset; } /** * @return End offset of this segment(inclusive). */ public long endOffset() { return endOffset; } /** * @return Leader or controller epoch of the broker from where this event occurred. */ public int brokerEpoch() { return brokerEpoch; } /** * @return Epoch time at which this evcent is occurred. */ public long eventTimestamp() { return eventTimestamp; } /** * @return */ public int segmentSizeInBytes() { return segmentSizeInBytes; } public RemoteLogSegmentState state() { return state; } public long maxTimestamp() { return maxTimestamp; } public Map<Int, Long> segmentLeaderEpochs() { return segmentLeaderEpochs; } ... } package org.apache.kafka.server.log.remote.storage; ... public class LogSegmentData { private final File logSegment; private final File offsetIndex; private final File timeIndex; private final File txnIndex; private final File producerIdSnapshotIndex; private final ByteBuffer leaderEpochIndex; public LogSegmentData(File logSegment, File offsetIndex, File timeIndex, File txnIndex, File producerIdSnapshotIndex, ByteBuffer leaderEpochIndex) { this.logSegment = logSegment; this.offsetIndex = offsetIndex; this.timeIndex = timeIndex; this.txnIndex = txnIndex; this.producerIdSnapshotIndex = producerIdSnapshotIndex; this.leaderEpochIndex = leaderEpochIndex; } public File logSegment() { return logSegment; } public File offsetIndex() { return offsetIndex; } public File timeIndex() { return timeIndex; } public File txnIndex() { return txnIndex; } public File producerIdSnapshotIndex() { return producerIdSnapshotIndex; } public ByteBuffer leaderEpochIndex() { return leaderEpochIndex; } ... } |
...