THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * RemoteStorageManager provides the lifecycle of remote log segments which includes copy, fetch, and delete operations. * * {@link RemoteLogMetadataManager} is responsible storing and fetching metadata about the remote log segments in a * strongly consistent manner. * * Each upload or copy of a segment is given with a {@link RemoteLogSegmentId} which is universally unique even for the * same topic partition and offsets. Once the copy or upload is successful, {@link RemoteLogSegmentMetadata} is * created with RemoteLogSegmentId and other log segment information and it is stored in {@link RemoteLogMetadataManager}. * This allows RemoteStorageManager to store segments even in eventually consistent manner as the metadata is already * stored in a consistent store. * * All these APIs are still experimental. */ @InterfaceStability.Unstable public interface RemoteStorageManager extends Configurable, Closeable { /** * Copies LogSegmentData provided for the given RemoteLogSegmentId and returns any contextual * information about this copy operation. This can include path to the object in the store etc. * * Invoker of this API should always send a unique id as part of {@link RemoteLogSegmentId#id()} even when it * retries to invoke this method for the same log segment data. * * @param remoteLogSegmentId * @param logSegmentData * @return * @throws IOException */ RemoteLogSegmentContext copyLogSegment(RemoteLogSegmentId remoteLogSegmentId, LogSegmentData logSegmentData) throws IOException; /** * Returns the remote log segment data file/object as InputStream for the given RemoteLogSegmentMetadata starting * from the given startPosition. If endPosition is given then the stream will end at that position else it will be * given till the end of the remote log segment data file/object. * * @param remoteLogSegmentId * @param startPosition * @param endPosition * @return * @throws IOException */ InputStream fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentId, Long startPosition, Optional<Long> endPosition) throws IOException; /** * Deletes the remote log segment for the given remoteLogSegmentId. Returns true if the deletion is successful. * Broker pushes an event to __deleteremote_segments_to_be_deleteddelete_failed_remote_log_segments topic for failed segment deletions so that users * can do the cleanup later. * * @param remoteLogSegmentId * @return * @throws IOException */ boolean deleteLogSegment(RemoteLogSegmentId remoteLogSegmentId) throws IOException; } /** * This represents a universally unique id associated to a topic partition's log segment. This will be regenerated for * every attempt of copying a specific log segment in {@link RemoteLogStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)}. */ public class RemoteLogSegmentId { private TopicPartition topicPartition; private UUID id; public RemoteLogSegmentId(TopicPartition topicPartition, UUID id) { this.topicPartition = requireNonNull(topicPartition); this.id = requireNonNull(id); } public TopicPartition topicPartition() { return topicPartition; } public UUID id() { return id; } ... } public class LogSegmentData { private FileRecords logSegment; private File offsetIndex; private File timeIndex; //todo add other required indexes like txnIndex ... } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * This interface provides storing and fetching remote log segment metadata with strongly consistent semantics. * */ @InterfaceStability.Unstable public interface RemoteLogMetadataManager extends Configurable, Closeable { /** * Stores RemoteLogSegmentMetadata for the given RemoteLogSegmentMetadata. * * @param remoteLogSegmentId * @param remoteLogSegmentMetadata * @throws IOException */ void putRemoteLogSegmentData(RemoteLogSegmentId remoteLogSegmentId, RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException; /** * Fetches RemoteLogSegmentId for the given topic partition which contains the given offset. * * @param topicPartition * @param offset * @return * @throws IOException */ RemoteLogSegmentId getRemoteLogSegmentId(TopicPartition topicPartition, long offset) throws IOException; /** * Fetches RemoteLogSegmentMetadata for the given RemoteLogSegmentId. * * @param metadata * @return * @throws IOException */ RemoteLogSegmentMetadata getRemoteLogSegmentMetadata(RemoteLogSegmentId metadata) throws IOException; /** * Earliest log offset if exists for the given topic partition in the remote storage. Return {@link Optional#empty()} * if there are no segments in the remote storage. * * @param tp * @return */ Optional<Long> earliestLogOffset(TopicPartition tp) throws IOException; /** * ListSets the remoteearliest log segment files ofoffset for the given topic topicPartitionpartition. * The RemoteLogManager of a follower* uses@param thistp method to find out the* remote data for the given topic partition.@throws IOException */ void * @return List of remote segments, sorted by baseOffset in ascending order.setEarliestLogOffset(TopicPartition tp, long offset) throws IOException; /** */ Deletes the default List<RemoteLogSegmentMetadata> listRemoteSegments(TopicPartition topicPartition) {log segment metadata for the given remoteLogSegmentId. * * @param remoteLogSegmentId * @throws IOException */ void deleteRemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId) throws IOException; /** * List the remote log segment files of the given topicPartition. * The RemoteLogManager of a follower uses this method to find out the remote data for the given topic partition. * * @return List of remote segments, sorted by baseOffset in ascending order. */ default List<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicPartition topicPartition) { return listRemoteLogSegments(topicPartition, 0); } /** * * @param topicPartition * @param minOffset * @return */ return List<RemoteLogSegmentMetadata> listRemoteSegmentslistRemoteLogSegments(TopicPartition topicPartition, 0); }long minOffset); } /** * Metadata about the log segment stored in remote tier storage. */ public class RemoteLogSegmentMetadata { /** * Universally unique remote log segment id. */ @param topicPartition private RemoteLogSegmentId remoteLogSegmentId; * @param minOffset/** * @return Start offset of this segment. */ List<RemoteLogSegmentMetadata> listRemoteSegments(TopicPartition topicPartition, private long minOffset)startOffset; } /** * MetadataEnd aboutoffset theof logthis segment stored. in remote tier storage. */ public classprivate RemoteLogSegmentMetadatalong {endOffset; private RemoteLogSegmentId remoteLogSegmentId; private long startOffset;/** * leader epoch of the broker. private long endOffset; */ private int leaderEpoch; /** * whether thethat remote segment is created or not. */ private boolean created; /** * Any context returned by {@link RemoteLogStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)} for * the given remoteLogSegmentId */ private byte[] remoteLogSegmentContext; ... } |
Proposed Changes
High-level design
...