Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

         `RemoteStorageManager` is an interface to provide the lifecycle of remote log segments. We will provide a simple implementation of RSM to get a better understanding of the APIs. HDFS and S3 implementation are planned to be hosted in external repos and these will not be part of Apache Kafka repo. This is inline with the approach taken for Kafka connectors.


Code Block
languagejava
collapsetitletrueRemoteStorageManager
/**
 * RemoteStorageManager provides the lifecycle of remote log segments which includes copy, fetch, and delete operations.
 *
 * {@link RemoteLogMetadataManager} is responsible storing and fetching metadata about the remote log segments in a
 * strongly consistent manner.
 *
 * Each upload or copy of a segment is given with a {@link RemoteLogSegmentId} which is universally unique even for the
 * same topic partition and offsets. Once the copy or upload is successful, {@link RemoteLogSegmentMetadata} is
 * created with RemoteLogSegmentId and other log segment information and it is stored in {@link RemoteLogMetadataManager}.
 * This allows RemoteStorageManager to store segments even in eventually consistent manner as the metadata is already
 * stored in a consistent store.
 *
 * All these APIs are still experimental.
 */
@InterfaceStability.Unstable
public interface RemoteStorageManager extends Configurable, Closeable {

    /**
     * Copies LogSegmentData provided for the given RemoteLogSegmentId and returns any contextual
     * information about this copy operation. This can include path to the object in the store etc.
     *
     * Invoker of this API should always send a unique id as part of {@link RemoteLogSegmentId#id()} even when it
     * retries to invoke this method for the same log segment data.
     *
     * @param remoteLogSegmentId
     * @param logSegmentData
     * @return
     * @throws IOException
     */
    RemoteLogSegmentContext copyLogSegment(RemoteLogSegmentId remoteLogSegmentId, LogSegmentData logSegmentData) throws IOException;

    /**
     * Returns the remote log segment data file/object as InputStream for the given RemoteLogSegmentMetadata starting
     * from the given startPosition. If endPosition is given then the stream will end at that position else it will be
     * given till the end of the remote log segment data file/object.
     *
     * @param remoteLogSegmentId
     * @param startPosition
     * @param endPosition
     * @return
     * @throws IOException
     */
    InputStream fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentId, Long startPosition, Optional<Long> endPosition) throws IOException;

    /**
     * Deletes the remote log segment for the given remoteLogSegmentId. Returns true if the deletion is successful.
     * Broker pushes an event to __delete_failed_remote_log_segments topic for failed segment deletions so that users
     * can do the cleanup later.
     *
     * @param remoteLogSegmentId
     * @return
     * @throws IOException
     */
    boolean deleteLogSegment(RemoteLogSegmentId remoteLogSegmentId) throws IOException;
}


/**
 * This represents a universally unique id associated to a topic partition's log segment. This will be regenerated for
 * every attempt of copying a specific log segment in {@link RemoteLogStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)}.
 */
public class RemoteLogSegmentId {
    private TopicPartition topicPartition;
    private UUID id;

    public RemoteLogSegmentId(TopicPartition topicPartition, UUID id) {
        this.topicPartition = requireNonNull(topicPartition);
        this.id = requireNonNull(id);
    }

    public TopicPartition topicPartition() {
        return topicPartition;
    }

    public UUID id() {
        return id;
    }
...
}


public class LogSegmentData {

    private FileRecords logSegment;
    private File offsetIndex;
    private File timeIndex;
    //todo add other required indexes like txnIndex
...
}

...

Code Block
languagejava
titleRemoteLogMetadataManagercollapsetrue
/**
 * This interface provides storing and fetching remote log segment metadata with strongly consistent semantics.
 *
 */
@InterfaceStability.Unstable
public interface RemoteLogMetadataManager extends Configurable, Closeable {

    /**
     * Stores RemoteLogSegmentMetadata for the given RemoteLogSegmentMetadata.
     *
     * @param remoteLogSegmentId
     * @param remoteLogSegmentMetadata
     * @throws IOException
     */
    void putRemoteLogSegmentData(RemoteLogSegmentId remoteLogSegmentId, RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException;

    /**
     * Fetches RemoteLogSegmentId for the given topic partition which contains the given offset.
     *
     * @param topicPartition
     * @param offset
     * @return
     * @throws IOException
     */
    RemoteLogSegmentId getRemoteLogSegmentId(TopicPartition topicPartition, long offset) throws IOException;

    /**
     * Fetches RemoteLogSegmentMetadata for the given RemoteLogSegmentId.
     *
     * @param metadata
     * @return
     * @throws IOException
     */
    RemoteLogSegmentMetadata getRemoteLogSegmentMetadata(RemoteLogSegmentId metadata) throws IOException;

    /**
     * Earliest log offset if exists for the given topic partition in the remote storage. Return {@link Optional#empty()}
     * if there are no segments in the remote storage.
     *
     * @param tp
     * @return
     */
    Optional<Long> earliestLogOffset(TopicPartition tp) throws IOException;

    /**
     * List the remote log segment files of the given topicPartition.
     * The RemoteLogManager of a follower uses this method to find out the remote data for the given topic partition.
     *
     * @return List of remote segments, sorted by baseOffset in ascending order.
     */
    default List<RemoteLogSegmentInfo> listRemoteSegments(TopicPartition topicPartition) {
        return listRemoteSegments(topicPartition, 0);
    }

    /**
     *
     * @param topicPartition
     * @param minOffset
     * @return
     */
    List<RemoteLogSegmentInfo> listRemoteSegments(TopicPartition topicPartition, long minOffset);

}

...