THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* RemoteStorageManager provides the lifecycle of remote log segments which includes copy, fetch, and delete operations.
*
* {@link RemoteLogMetadataManager} is responsible for storing and fetching metadata about the remote log segments in a
* strongly consistent manner.
*
* Each upload or copy of a segment is given with a {@link RemoteLogSegmentId} which is universally unique even for the
* same topic partition and offsets. Once the copy or upload is successful, {@link RemoteLogSegmentMetadata} is
* created with RemoteLogSegmentId and other log segment information and it is stored in {@link RemoteLogMetadataManager}.
* This allows RemoteStorageManager to store segments even in eventually consistent manner as the metadata is already
* stored in a consistent store.
*
* All these APIs are still experimental.
*/
@InterfaceStability.Unstable
public interface RemoteStorageManager extends Configurable, Closeable {
/**
* Copies LogSegmentData provided for the given RemoteLogSegmentId and returns any contextual
* information about this copy operation. This can include path to the object in the store etc.
*
* Invoker of this API should always send a unique id as part of {@link RemoteLogSegmentId#id()} even when it
* retries to invoke this method for the same log segment data.
*
* @param remoteLogSegmentId
* @param logSegmentData
* @return
* @throws IOException
*/
RemoteLogSegmentContext copyLogSegment(RemoteLogSegmentId remoteLogSegmentId, LogSegmentData logSegmentData)
throws RemoteStorageException;
/**
* Returns the remote log segment data file/object as InputStream for the given RemoteLogSegmentMetadata starting
* from the given startPosition. The stream will end at the smaller of endPosition and the end of the remote log
* segment data file/object.
*
* @param remoteLogSegmentMetadata
* @param startPosition
* @param endPosition
* @return
* @throws IOException
*/
InputStream fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
Long startPosition, Long endPosition) throws RemoteStorageException;
/**
* Returns the offset index for the respective log segment of {@link RemoteLogSegmentMetadata}.
*
* @param remoteLogSegmentMetadata
* @return
* @throws IOException
*/
InputStream fetchOffsetIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;
/**
* Returns the timestamp index for the respective log segment of {@link RemoteLogSegmentMetadata}.
*
* @param remoteLogSegmentMetadata
* @return
* @throws IOException
*/
InputStream fetchTimestampIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;
/**
*
* @param remoteLogSegmentMetadata
* @return
* @throws RemoteStorageException
*/
default InputStream fetchTransactionIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
throw new UnsupportedOperationException();
}
/**
*
* @param remoteLogSegmentMetadata
* @return
* @throws RemoteStorageException
*/
default InputStream fetchProducerSnapshotIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
throw new UnsupportedOperationException();
}
/**
* Returns the leader epoch index till the the respective log segment of {@link RemoteLogSegmentMetadata}.
*
* @param remoteLogSegmentMetadata
* @return
* @throws RemoteStorageException
*/
default InputStream fetchLeaderEpochIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
throw new UnsupportedOperationException();
}
/**
* Deletes the remote log segment for the given remoteLogSegmentMetadata. Deletion is considered as successful if
* this call returns successfully without any exceptions. It will throw {@link RemoteStorageException} if there are
* any errors in deleting the file.
*
* Broker pushes an event to __delete_failed_remote_log_segments topic for failed segment deletions so that users
* can do the cleanup later.
*
* @param remoteLogSegmentMetadata
* @throws IOException
*/
void deleteLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;
}
/**
* This represents a universally unique id associated to a topic partition's log segment. This will be regenerated for
* every attempt of copying a specific log segment in {@link RemoteLogStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)}.
*/
public class RemoteLogSegmentId {
private TopicPartition topicPartition;
private UUID id;
public RemoteLogSegmentId(TopicPartition topicPartition, UUID id) {
this.topicPartition = requireNonNull(topicPartition);
this.id = requireNonNull(id);
}
public TopicPartition topicPartition() {
return topicPartition;
}
public UUID id() {
return id;
}
...
}
public class LogSegmentData {
private final File logSegment;
private final File offsetIndex;
private final File timeIndex;
private final File txnIndex;
private final File producerIdSnapshotIndex;
private final File leaderEpochIndex;
public LogSegmentData(File logSegment, File offsetIndex, File timeIndex, File txnIndex, File producerIdSnapshotIndex,
File leaderEpochIndex) {
this.logSegment = logSegment;
this.offsetIndex = offsetIndex;
this.timeIndex = timeIndex;
this.txnIndex = txnIndex;
this.producerIdSnapshotIndex = producerIdSnapshotIndex;
this.leaderEpochIndex = leaderEpochIndex;
}
...
} |
...