THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.common.log.remote.storage;
...
/**
* RemoteStorageManager provides the lifecycle of remote log segments that includes copy, fetch, and delete from remote
* storage.
* <p>
* Each upload or copy of a segment is initiated with {@link RemoteLogSegmentMetadata} containing {@link RemoteLogSegmentId}
* which is universally unique even for the same topic partition and offsets.
* <p>
* RemoteLogSegmentMetadata is stored in {@link RemoteLogMetadataManager} before and after copy/delete operations on
* RemoteStorageManager with the respective {@link RemoteLogSegmentMetadata.State}. {@link RemoteLogMetadataManager} is
* responsible for storing and fetching metadata about the remote log segments in a strongly consistent manner.
* This allows RemoteStorageManager to store segments even in eventually consistent manner as the metadata is already
* stored in a consistent store.
* <p>
* All these APIs are still evolving.
*/
@InterfaceStability.Unstable
public interface RemoteStorageManager extends Configurable, Closeable {
/**
* Copies LogSegmentData provided for the given {@param remoteLogSegmentMetadata}.
* <p>
* Invoker of this API should always send a unique id as part of {@link RemoteLogSegmentMetadata#remoteLogSegmentId()#id()}
* even when it retries to invoke this method for the same log segment data.
*
* @param remoteLogSegmentMetadata metadata about the remote log segment.
* @param logSegmentData data to be copied to tiered storage.
* @throws RemoteStorageException if there are any errors in storing the data of the segment.
*/
void copyLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata, LogSegmentData logSegmentData)
throws RemoteStorageException;
/**
* Returns the remote log segment data file/object as InputStream for the given RemoteLogSegmentMetadata starting
* from the given startPosition. The stream will end at the smaller of endPosition and the end of the remote log
* segment data file/object.
*
* @param remoteLogSegmentMetadata metadata about the remote log segment.
* @param startPosition start position of log segment to be read, inclusive.
* @param endPosition end position of log segment to be read, inclusive.
* @return input stream of the requested log segment data.
* @throws RemoteStorageException if there are any errors while fetching the desired segment.
*/
InputStream fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
Long startPosition, Long endPosition) throws RemoteStorageException;
/**
* Returns the offset index for the respective log segment of {@link RemoteLogSegmentMetadata}.
*
* @param remoteLogSegmentMetadata metadata about the remote log segment.
* @return input stream of the requested offset index.
* @throws RemoteStorageException if there are any errors while fetching the index.
*/
InputStream fetchOffsetIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;
/**
* Returns the timestamp index for the respective log segment of {@link RemoteLogSegmentMetadata}.
*
* @param remoteLogSegmentMetadata metadata about the remote log segment.
* @return input stream of the requested timestamp index.
* @throws RemoteStorageException if there are any errors while fetching the index.
*/
InputStream fetchTimestampIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) listRemoteLogSegmentsthrows RemoteStorageException;
/**
* Returns the transaction index for the the respective log segment of {@link RemoteLogSegmentMetadata}.
*
* @param remoteLogSegmentMetadata metadata about the remote log segment.
* @return input stream of the requested transaction index.
* @throws RemoteStorageException if there are any errors while fetching the index.
*/
default InputStream fetchTransactionIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;
/**
* Returns the producer snapshot index for the the respective log segment of {@link RemoteLogSegmentMetadata}.
*
* @param remoteLogSegmentMetadata metadata about the remote log segment.
* @return input stream of the producer snapshot.
* @throws RemoteStorageException if there are any errors while fetching the index.
*/
InputStream fetchProducerSnapshotIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;
/**
* Returns the leader epoch index for the the respective log segment of {@link RemoteLogSegmentMetadata}.
*
* @param remoteLogSegmentMetadata metadata about the remote log segment.
* @return input stream of the leader epoch index.
* @throws RemoteStorageException if there are any errors while fetching the index.
*/
InputStream fetchLeaderEpochIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;
/**
* Deletes the resources associated with the given {@param remoteLogSegmentMetadata}. Deletion is considered as
* successful if this call returns successfully without any errors. It will throw {@link RemoteStorageException} if
* there are any errors in deleting the file.
* <p>
* {@link RemoteResourceNotFoundException} is thrown when there are no resources associated with the given
* {@param remoteLogSegmentMetadata}.
*
* @param remoteLogSegmentMetadata metadata about the remote log segment to be deleted.
* @throws RemoteResourceNotFoundException if the requested resource is not found
* @throws RemoteStorageException if there are any storage related errors occurred.
*/
void deleteLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;
}
package org.apache.kafka.common;
...
public class TopicIdPartition {
private final UUID topicId;
private final TopicPartition topicPartition;
public TopicIdPartition(UUID topicId, TopicPartition topicPartition) {
Objects.requireNonNull(topicId, "topicId can not be null");
Objects.requireNonNull(topicPartition, "topicPartition can not be null");
this.topicId = topicId;
this.topicPartition = topicPartition;
}
public UUID topicId() {
return topicId;
}
public TopicPartition topicPartition() {
return topicPartition;
}
...
}
package org.apache.kafka.common.log.remote.storage;
...
/**
* This represents a universally unique identifier associated to a topic partition's log segment. This will be
* regenerated for every attempt of copying a specific log segment in {@link RemoteStorageManager#copyLogSegment(RemoteLogSegmentMetadata, LogSegmentData)}.
*/
public class RemoteLogSegmentId implements Comparable<RemoteLogSegmentId>, Serializable {
private static final long serialVersionUID = 1L;
private final TopicIdPartition topicIdPartition;
private final UUID id;
public RemoteLogSegmentId(TopicIdPartition topicIdPartition, UUID id) {
this.topicIdPartition = requireNonNull(topicIdPartition);
this.id = requireNonNull(id);
}
/**
* Returns TopicIdPartition of this remote log segment.
*
* @return
*/
public TopicIdPartition topicIdPartition() {
return topicIdPartition;
}
/**
* Returns Universally Unique Id of this remote log segment.
*
* @return
*/
public UUID id() {
return id;
}
...
}
package org.apache.kafka.common.log.remote.storage;
...
/**
* It describes the metadata about the log segment in the remote storage.
*/
public class RemoteLogSegmentMetadata implements Serializable {
private static final long serialVersionUID = 1L;
/**
* Universally unique remote log segment id.
*/
private final RemoteLogSegmentId remoteLogSegmentId;
/**
* Start offset of this segment.
*/
private final long startOffset;
/**
* End offset of this segment.
*/
private final long endOffset;
/**
* Leader epoch of the broker.
*/
private final int leaderEpoch;
/**
* Maximum timestamp in the segment
*/
private final long maxTimestamp;
/**
* Epoch time at which the respective {@link #state} is set.
*/
private final long eventTimestamp;
/**
* LeaderEpoch vs offset for messages with in this segment.
*/
private final Map<Int, Long> segmentLeaderEpochs;
/**
* Size of the segment in bytes.
*/
private final int segmentSizeInBytes;
/**
* It indicates the state in which the action is executed on this segment.
*/
private final RemoteLogState state;
/**
* @param remoteLogSegmentId Universally unique remote log segment id.
* @param startOffset Start offset of this segment.
* @param endOffset End offset of this segment.
* @param maxTimestamp maximum timestamp in this segment
* @param leaderEpoch Leader epoch of the broker.
* @param eventTimestamp Epoch time at which the remote log segment is copied to the remote tier storage.
* @param segmentSizeInBytes size of this segment in bytes.
* @param state The respective segment of remoteLogSegmentId is marked fro deletion.
* @param segmentLeaderEpochs leader epochs occurred with in this segment
*/
public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, long startOffset, long endOffset,
long maxTimestamp, int leaderEpoch, long eventTimestamp,
int segmentSizeInBytes, RemoteLogState state, Map<Int, Long> segmentLeaderEpochs) {
this.remoteLogSegmentId = remoteLogSegmentId;
this.startOffset = startOffset;
this.endOffset = endOffset;
this.leaderEpoch = leaderEpoch;
this.maxTimestamp = maxTimestamp;
this.eventTimestamp = eventTimestamp;
this.segmentLeaderEpochs = segmentLeaderEpochs;
this.state = state;
this.segmentSizeInBytes = segmentSizeInBytes;
}
/**
* @return unique id of this segment.
*/
public RemoteLogSegmentId remoteLogSegmentId() {
return remoteLogSegmentId;
}
/**
* @return Start offset of this segment(inclusive).
*/
public long startOffset() {
return startOffset;
}
/**
* @return End offset of this segment(inclusive).
*/
public long endOffset() {
return endOffset;
}
/**
* @return Leader or controller epoch of the broker from where this event occurred.
*/
public int brokerEpoch() {
return brokerEpoch;
}
/**
* @return Epoch time at which this evcent is occurred.
*/
public long eventTimestamp() {
return eventTimestamp;
}
/**
* @return
*/
public int segmentSizeInBytes() {
return segmentSizeInBytes;
}
public RemoteLogState state() {
return state;
}
public long maxTimestamp() {
return maxTimestamp;
}
public Map<Int, Long> segmentLeaderEpochs() {
return segmentLeaderEpochs;
}
...
}
package org.apache.kafka.common.log.remote.storage;
...
public class LogSegmentData {
private final File logSegment;
private final File offsetIndex;
private final File timeIndex;
private final File txnIndex;
private final File producerIdSnapshotIndex;
private final ByteBuffer leaderEpochIndex;
public LogSegmentData(File logSegment, File offsetIndex, File timeIndex, File txnIndex, File producerIdSnapshotIndex,
ByteBuffer leaderEpochIndex) {
this.logSegment = logSegment;
this.offsetIndex = offsetIndex;
this.timeIndex = timeIndex;
this.txnIndex = txnIndex;
this.producerIdSnapshotIndex = producerIdSnapshotIndex;
this.leaderEpochIndex = leaderEpochIndex;
}
public File logSegment() {
return logSegment;
}
public File offsetIndex() {
return offsetIndex;
}
public File timeIndex() {
return timeIndex;
}
public File txnIndex() {
return txnIndex;
}
public File producerIdSnapshotIndex() {
return producerIdSnapshotIndex;
}
public ByteBuffer leaderEpochIndex() {
return leaderEpochIndex;
}
...
} |
...