THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * RemoteStorageManager provides the lifecycle of remote log segments which includes copy, fetch, and delete operations. * * {@link RemoteLogMetadataManager} is responsible for storing and fetching metadata about the remote log segments in a * strongly consistent manner. * * Each upload or copy of a segment is given with a {@link RemoteLogSegmentId} which is universally unique even for the * same topic partition and offsets. Once the copy or upload is successful, {@link RemoteLogSegmentMetadata} is * created with RemoteLogSegmentId and other log segment information and it is stored in {@link RemoteLogMetadataManager}. * This allows RemoteStorageManager to store segments even in eventually consistent manner as the metadata is already * stored in a consistent store. * * All these APIs are still experimental. */ @InterfaceStability.Unstable public interface RemoteStorageManager extends Configurable, Closeable { /** * Copies LogSegmentData provided for the given RemoteLogSegmentId and returns any contextual * information about this copy operation. This can include path to the object in the store etc. * * Invoker of this API should always send a unique id as part of {@link RemoteLogSegmentId#id()} even when it * retries to invoke this method for the same log segment data. * * @param remoteLogSegmentId * @param logSegmentData * @return * @throws IOException */ RemoteLogSegmentContext copyLogSegment(RemoteLogSegmentId remoteLogSegmentId, LogSegmentData logSegmentData) throws RemoteStorageException; /** * Returns the remote log segment data file/object as InputStream for the given RemoteLogSegmentMetadata starting * from the given startPosition. The stream will end at the smaller of endPosition and the end of the remote log * segment data file/object. * * @param remoteLogSegmentMetadata * @param startPosition * @param endPosition * @return * @throws IOException */ InputStream fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata, Long startPosition, Long endPosition) throws RemoteStorageException; /** * Returns the offset index for the respective log segment of {@link RemoteLogSegmentMetadata}. * * @param remoteLogSegmentMetadata * @return * @throws IOException */ InputStream fetchOffsetIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException; /** * Returns the timestamp index for the respective log segment of {@link RemoteLogSegmentMetadata}. * * @param remoteLogSegmentMetadata * @return * @throws IOException */ InputStream fetchTimestampIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException; /** * * @param remoteLogSegmentMetadata * @return * @throws RemoteStorageException */ default InputStream fetchTransactionIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException { throw new UnsupportedOperationException(); } /** * * @param remoteLogSegmentMetadata * @return * @throws RemoteStorageException */ default InputStream fetchProducerSnapshotIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException { throw new UnsupportedOperationException(); } /** * Deletes the remote log segment for the given remoteLogSegmentMetadata. Deletion is considered as successful if * this call returns successfully without any exceptions. It will throw {@link RemoteStorageException} if there are * any errors in deleting the file. * * Broker pushes an event to __delete_failed_remote_log_segments topic for failed segment deletions so that users * can do the cleanup later. * * @param remoteLogSegmentMetadata * @throws IOException */ void deleteLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException; } /** * This represents a universally unique id associated to a topic partition's log segment. This will be regenerated for * every attempt of copying a specific log segment in {@link RemoteLogStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)}. */ public class RemoteLogSegmentId { private TopicPartition topicPartition; private UUID id; public RemoteLogSegmentId(TopicPartition topicPartition, UUID id) { this.topicPartition = requireNonNull(topicPartition); this.id = requireNonNull(id); } public TopicPartition topicPartition() { return topicPartition; } public UUID id() { return id; } ... } public class LogSegmentData { private final File logSegment; private final File offsetIndex; private final File timeIndex; private final File txnIndex; private final File producerIdSnapshotIndex; private final File leaderEpochIndex; public LogSegmentData(File logSegment, File offsetIndex, File timeIndex, File txnIndex, File producerIdSnapshotIndex, File leaderEpochIndex) { this.logSegment = logSegment; this.offsetIndex = offsetIndex; this.timeIndex = timeIndex; this.txnIndex = txnIndex; this.producerIdSnapshotIndex = producerIdSnapshotIndex; this.leaderEpochIndex = leaderEpochIndex; } ... } |
RemoteStorageManager_Old
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* RemoteStorageManager is an interface that allows to plugin different remote storage implementations to copy and
* retrieve the log segments.
*
* All these APIs are still experimental.
*/
@InterfaceStability.Unstable
trait RemoteStorageManager extends Configurable with AutoCloseable {
/**
* Earliest log offset if exists for the given topic partition in the remote storage. Return -1 if there are no
* segments in the remote storage.
*
* @param tp
* @return
*/
@throws(classOf[IOException])
def earliestLogOffset(tp: TopicPartition): Long
/**
* Copies LogSegment provided by [[RemoteLogManager]] for the given topic partition with the given leader epoch.
* Returns the RDIs of the remote data. This method is invoked by the leader of topic partition.
*
* //todo LogSegment is not public, this will be changed with an interface which provides base and end offset of the
* segment, log and offset/time indexes.
*
* @param topicPartition
* @param logSegment
* @return
*/
@throws(classOf[IOException])
def copyLogSegment(topicPartition: TopicPartition, logSegment: LogSegment,
leaderEpoch: Int): util.List[RemoteLogIndexEntry]
/**
* List the remote log segment files of the given topicPartition.
* The RemoteLogManager of a follower uses this method to find out the remote data for the given topic partition.
*
* @return List of remote segments, sorted by baseOffset in ascending order.
*/
@throws(classOf[IOException])
def listRemoteSegments(topicPartition: TopicPartition): util.List[RemoteLogSegmentInfo] = {
listRemoteSegments(topicPartition, 0)
}
/**
* List the remote log segment files of the specified topicPartition with offset >= minOffset.
* The RLM of a follower uses this method to find out the remote data
*
* @param minOffset The minimum offset for a segment to be returned.
* @return List of remote segments that contains offsets >= minOffset, sorted by baseOffset in ascending order.
*/
@throws(classOf[IOException])
def listRemoteSegments(topicPartition: TopicPartition, minOffset: Long): util.List[RemoteLogSegmentInfo]
/**
* Returns a List of RemoteLogIndexEntry for the given RemoteLogSegmentInfo. This is used by follower to store remote
* log indexes locally.
*
* @param remoteLogSegment
* @return
*/
@throws(classOf[IOException])
def getRemoteLogIndexEntries(remoteLogSegment: RemoteLogSegmentInfo): util.List[RemoteLogIndexEntry]
/**
* Deletes remote LogSegment file and indexes for the given remoteLogSegmentInfo
*
* @param remoteLogSegmentInfo
* @return
*/
@throws(classOf[IOException])
def deleteLogSegment(remoteLogSegmentInfo: RemoteLogSegmentInfo): Boolean
/**
* Delete all the log segments for the given topic partition. This can be done by rename the existing locations
* and delete them later in asynchronous manner.
*
* @param topicPartition
* @return
*/
@throws(classOf[IOException])
def deleteTopicPartition(topicPartition: TopicPartition): Boolean
/**
* Remove the log segments which are older than the given cleanUpTillMs. Return the log start offset of the
* earliest remote log segment if exists or -1 if there are no log segments in the remote storage.
*
* @param topicPartition
* @param cleanUpTillMs
* @return
*/
@throws(classOf[IOException])
def cleanupLogUntil(topicPartition: TopicPartition, cleanUpTillMs: Long): Long
/**
* Read up to maxBytes data from remote storage, starting from the 1st batch that is greater than or equals to the
* startOffset. It will read at least one batch, if the 1st batch size is larger than maxBytes.
*
* @param remoteLogIndexEntry The first remoteLogIndexEntry that remoteLogIndexEntry.lastOffset >= startOffset
* @param maxBytes maximum bytes to fetch for the given entry
* @param startOffset initial offset to be read from the given rdi in remoteLogIndexEntry
* @param minOneMessage if true read at least one record even if the size is more than maxBytes
* @return
*/
@throws(classOf[IOException])
def read(remoteLogIndexEntry: RemoteLogIndexEntry, maxBytes: Int, startOffset: Long, minOneMessage: Boolean): Records
/**
* Search forward for the first message that meets the following requirements:
* - Message's timestamp is greater than or equals to the targetTimestamp.
* - Message's offset is greater than or equals to the startingOffset.
*
* @param targetTimestamp The timestamp to search for.
* @param startingOffset The starting offset to search.
* @return The timestamp and offset of the message found. Null if no message is found.
*/
@throws(classOf[IOException])
def findOffsetByTimestamp(remoteLogIndexEntry: RemoteLogIndexEntry,
targetTimestamp: Long,
startingOffset: Long): TimestampAndOffset
/**
* Release any system resources used by this instance.
*/
def close(): Unit
} |
RemoteLogMetadataManager
`RemoteLogMetadataManager` is an interface to provide the lifecycle of metadata about remote log segments with strongly consistent semantics. There is a default implementation that uses an internal topic. Users can plugin their own implementation if they intend to use another system to store remote log segment metadata.
...