Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleRemoteStorageManager
/**
 * RemoteStorageManager provides the lifecycle of remote log segments which includes copy, fetch, and delete operations.
 *
 * {@link RemoteLogMetadataManager} is responsible storing and fetching metadata about the remote log segments in a
 * strongly consistent manner.
 *
 * Each upload or copy of a segment is given with a {@link RemoteLogSegmentId} which is universally unique even for the
 * same topic partition and offsets. Once the copy or upload is successful, {@link RemoteLogSegmentMetadata} is
 * created with RemoteLogSegmentId and other log segment information and it is stored in {@link RemoteLogMetadataManager}.
 * This allows RemoteStorageManager to store segments even in eventually consistent manner as the metadata is already
 * stored in a consistent store.
 *
 * All these APIs are still experimental.
 */
@InterfaceStability.Unstable
public interface RemoteStorageManager extends Configurable, Closeable {

    /**
     * Copies LogSegmentData provided for the given RemoteLogSegmentId and returns any contextual
     * information about this copy operation. This can include path to the object in the store etc.
     *
     * Invoker of this API should always send a unique id as part of {@link RemoteLogSegmentId#id()} even when it
     * retries to invoke this method for the same log segment data.
     *
     * @param remoteLogSegmentId
     * @param logSegmentData
     * @return
     * @throws IOException
     */
    RemoteLogSegmentContext copyLogSegment(RemoteLogSegmentId remoteLogSegmentId, LogSegmentData logSegmentData) throws IOException;

    /**
     * Returns the remote log segment data file/object as InputStream for the given RemoteLogSegmentMetadata starting
     * from the given startPosition. If endPosition is given then the stream will end at that position else it will be
     * given till the end of the remote log segment data file/object.
     *
     * @param remoteLogSegmentId
     * @param startPosition
     * @param endPosition
     * @return
     * @throws IOException
     */
    InputStream fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentId, Long startPosition, Optional<Long> endPosition) throws IOException;

    /**
     * Deletes the remote log segment for the given remoteLogSegmentId. Returns true if the deletion is successful.
     * Broker pushes an event to __deleteremote_segments_to_be_deleteddelete_failed_remote_log_segments topic for failed segment deletions so that users
     * can do the cleanup later.
     *
     * @param remoteLogSegmentId
     * @return
     * @throws IOException
     */
    boolean deleteLogSegment(RemoteLogSegmentId remoteLogSegmentId) throws IOException;
}


/**
 * This represents a universally unique id associated to a topic partition's log segment. This will be regenerated for
 * every attempt of copying a specific log segment in {@link RemoteLogStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)}.
 */
public class RemoteLogSegmentId {
    private TopicPartition topicPartition;
    private UUID id;

    public RemoteLogSegmentId(TopicPartition topicPartition, UUID id) {
        this.topicPartition = requireNonNull(topicPartition);
        this.id = requireNonNull(id);
    }

    public TopicPartition topicPartition() {
        return topicPartition;
    }

    public UUID id() {
        return id;
    }
...
}


public class LogSegmentData {

    private FileRecords logSegment;
    private File offsetIndex;
    private File timeIndex;
    //todo add other required indexes like txnIndex
...
}

...

Code Block
languagejava
titleRemoteLogMetadataManager

/**
 * This interface provides storing and fetching remote log segment metadata with strongly consistent semantics.
 *
 */
@InterfaceStability.Unstable
public interface RemoteLogMetadataManager extends Configurable, Closeable {

    /**
     * Stores RemoteLogSegmentMetadata for the given RemoteLogSegmentMetadata.
     *
     * @param remoteLogSegmentId
     * @param remoteLogSegmentMetadata
     * @throws IOException
     */
    void putRemoteLogSegmentData(RemoteLogSegmentId remoteLogSegmentId, RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException;

    /**
     * Fetches RemoteLogSegmentId for the given topic partition which contains the given offset.
     *
     * @param topicPartition
     * @param offset
     * @return
     * @throws IOException
     */
    RemoteLogSegmentId getRemoteLogSegmentId(TopicPartition topicPartition, long offset) throws IOException;

    /**
     * Fetches RemoteLogSegmentMetadata for the given RemoteLogSegmentId.
     *
     * @param metadata
     * @return
     * @throws IOException
     */
    RemoteLogSegmentMetadata getRemoteLogSegmentMetadata(RemoteLogSegmentId metadata) throws IOException;

    /**
     * Earliest log offset if exists for the given topic partition in the remote storage. Return {@link Optional#empty()}
     * if there are no segments in the remote storage.
     *
     * @param tp
     * @return
     */
    Optional<Long> earliestLogOffset(TopicPartition tp) throws IOException;

    /**
     * ListSets the remoteearliest log segment files ofoffset for the given topic topicPartitionpartition.
     *
 The RemoteLogManager of a follower* uses@param thistp
 method to find out the* remote data for the given topic partition.@throws IOException
     */
    void * @return List of remote segments, sorted by baseOffset in ascending order.setEarliestLogOffset(TopicPartition tp, long offset) throws IOException;

    /**
     */
 Deletes the  default List<RemoteLogSegmentMetadata> listRemoteSegments(TopicPartition topicPartition) {log segment metadata for the given remoteLogSegmentId.
     *
     * @param remoteLogSegmentId
     * @throws IOException
     */
    void deleteRemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId) throws IOException;

    /**
     * List the remote log segment files of the given topicPartition.
     * The RemoteLogManager of a follower uses this method to find out the remote data for the given topic partition.
     *
     * @return List of remote segments, sorted by baseOffset in ascending order.
     */
    default List<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicPartition topicPartition) {
        return listRemoteLogSegments(topicPartition, 0);
    }

    /**
     *
     * @param topicPartition
     * @param minOffset
     * @return
     */
   return List<RemoteLogSegmentMetadata> listRemoteSegmentslistRemoteLogSegments(TopicPartition topicPartition, 0);
    }long minOffset);

}


/**
 * Metadata about the log segment stored in remote tier storage.
 */
public class RemoteLogSegmentMetadata {

    /**
     * Universally unique remote log segment id.
     */
 @param topicPartition
  private RemoteLogSegmentId remoteLogSegmentId;

  * @param minOffset/**
     * @return Start offset of this segment.
     */
    List<RemoteLogSegmentMetadata> listRemoteSegments(TopicPartition topicPartition, private long minOffset)startOffset;

}

    /**
     * MetadataEnd aboutoffset theof logthis segment stored.
 in remote tier storage.
 */
public    classprivate RemoteLogSegmentMetadatalong {endOffset;

    private RemoteLogSegmentId remoteLogSegmentId;
    private long startOffset;/**
     * leader epoch of the broker.
    private long endOffset; */
    private int leaderEpoch;

    /**
     * whether thethat remote segment is created or not.
     */
    private boolean created;

    /**
     * Any context returned by {@link RemoteLogStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)} for
     * the given remoteLogSegmentId
     */
    private byte[] remoteLogSegmentContext;

...
}


Proposed Changes

High-level design

...