Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleRemoteStorageManager
/**
 * RemoteStorageManager provides the lifecycle of remote log segments which includes copy, fetch, and delete operations.
 *
 * {@link RemoteLogMetadataManager} is responsible for storing and fetching metadata about the remote log segments in a
 * strongly consistent manner.
 *
 * Each upload or copy of a segment is given with a {@link RemoteLogSegmentId} which is universally unique even for the
 * same topic partition and offsets. Once the copy or upload is successful, {@link RemoteLogSegmentMetadata} is
 * created with RemoteLogSegmentId and other log segment information and it is stored in {@link RemoteLogMetadataManager}.
 * This allows RemoteStorageManager to store segments even in eventually consistent manner as the metadata is already
 * stored in a consistent store.
 *
 * All these APIs are still experimental.
 */
@InterfaceStability.Unstable
public interface RemoteStorageManager extends Configurable, Closeable {

    /**
     * Copies LogSegmentData provided for the given RemoteLogSegmentId and returns any contextual
     * information about this copy operation. This can include path to the object in the store etc.
     *
     * Invoker of this API should always send a unique id as part of {@link RemoteLogSegmentId#id()} even when it
     * retries to invoke this method for the same log segment data.
     *
     * @param remoteLogSegmentId
     * @param logSegmentData
     * @return
     * @throws IOException
     */
    RemoteLogSegmentContext copyLogSegment(RemoteLogSegmentId remoteLogSegmentId, LogSegmentData logSegmentData)
            throws RemoteStorageException;

    /**
     * Returns the remote log segment data file/object as InputStream for the given RemoteLogSegmentMetadata starting
     * from the given startPosition. The stream will end at the smaller of endPosition and the end of the remote log
     * segment data file/object.
     *
     * @param remoteLogSegmentMetadata
     * @param startPosition
     * @param endPosition
     * @return
     * @throws IOException
     */
    InputStream fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
                                    Long startPosition, Long endPosition) throws RemoteStorageException;

    /**
     * Returns the offset index for the respective log segment of {@link RemoteLogSegmentMetadata}.
     *
     * @param remoteLogSegmentMetadata
     * @return
     * @throws IOException
     */
    InputStream fetchOffsetIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

    /**
     * Returns the timestamp index for the respective log segment of {@link RemoteLogSegmentMetadata}.
     *
     * @param remoteLogSegmentMetadata
     * @return
     * @throws IOException
     */
    InputStream fetchTimestampIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

    /**
     *
     * @param remoteLogSegmentMetadata
     * @return
     * @throws RemoteStorageException
     */
    default InputStream fetchTransactionIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
        throw new UnsupportedOperationException();
    }

    /**
     *
     * @param remoteLogSegmentMetadata
     * @return
     * @throws RemoteStorageException
     */
    default InputStream fetchProducerSnapshotIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
        throw new UnsupportedOperationException();
    }

    /**
     * Deletes the remote log segment for the given remoteLogSegmentMetadata. Deletion is considered as successful if
     * this call returns successfully without any exceptions. It will throw {@link RemoteStorageException} if there are
     * any errors in deleting the file.
     *
     * Broker pushes an event to __delete_failed_remote_log_segments topic for failed segment deletions so that users
     * can do the cleanup later.
     *
     * @param remoteLogSegmentMetadata
     * @throws IOException
     */
    void deleteLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

}



/**
 * This represents a universally unique id associated to a topic partition's log segment. This will be regenerated for
 * every attempt of copying a specific log segment in {@link RemoteLogStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)}.
 */
public class RemoteLogSegmentId {
    private TopicPartition topicPartition;
    private UUID id;

    public RemoteLogSegmentId(TopicPartition topicPartition, UUID id) {
        this.topicPartition = requireNonNull(topicPartition);
        this.id = requireNonNull(id);
    }

    public TopicPartition topicPartition() {
        return topicPartition;
    }

    public UUID id() {
        return id;
    }
...
}



public class LogSegmentData {

    private final File logSegment;
    private final File offsetIndex;
    private final File timeIndex;
    private final File txnIndex;
    private final File leaderEpochCacheproducerIdSnapshotIndex;
    private final File leaderEpochIndex;

    public LogSegmentData(File logSegment, File offsetIndex, File timeIndex, File txnIndex, File producerIdSnapshotIndex,
                          File leaderEpochCacheleaderEpochIndex) {
        this.logSegment = logSegment;
        this.offsetIndex = offsetIndex;
        this.timeIndex = timeIndex;
        this.txnIndex = txnIndex;
        this.producerIdSnapshotIndex = producerIdSnapshotIndex;
        this.leaderEpochIndex = leaderEpochIndex;
    }
...
}


RemoteStorageManager_Old

...

Leaders will send a new error code `OFFSET`OFFSET_MOVED_TO_TIERED_STORAGE` STORAGE` to followers if the fetch offset is not available in the local log. In this case, the follower needs to get auxiliary state of the remote og segments, which is leader epochs (possibly producer-snapshot-ids too). This can be done in two ways.

...

So, we need to add respective ReplicaStates for both which can be called `FetchingRemoteLogMetadata` and `FetchingRemoteLogAuxiliaryState`. Fetcher thread also processes both of these states in every run.  FetchingRemoteLogMetadata state checks whether RLMM on the follower was able to cacthup to receive the remote log segment metadata of the desired fetch offset. If it has a value then the state is moved to FetchingRemoteLogAuxiliaryState to FetchingRemoteLogAuxiliaryState by submitting a task to get the state from RemoteStorageManager using RLM thread pool. If the FetchingRemoteLogAuxiliaryState already receives the state then the state is moved to Fetching with the fetch offset as the remote log segment's endOffset+1.

...