Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleRemoteStorageManager
package org.apache.kafka.common.log.remote.storage;
...
/**
 * RemoteStorageManager provides the lifecycle of remote log segments that includes copy, fetch, and delete from remote
 * storage.
 * <p>
 * Each upload or copy of a segment is initiated with {@link RemoteLogSegmentMetadata} containing {@link RemoteLogSegmentId}
 * which is universally unique even for the same topic partition and offsets.
 * <p>
 * RemoteLogSegmentMetadata is stored in {@link RemoteLogMetadataManager} before and after copy/delete operations on
 * RemoteStorageManager with the respective {@link RemoteLogSegmentMetadata.State}. {@link RemoteLogMetadataManager} is
 * responsible for storing and fetching metadata about the remote log segments in a strongly consistent manner.
 * This allows RemoteStorageManager to store segments even in eventually consistent manner as the metadata is already
 * stored in a consistent store.
 * <p>
 * All these APIs are still evolving.
 */
@InterfaceStability.Unstable
public interface RemoteStorageManager extends Configurable, Closeable {

    /**
     * Copies LogSegmentData provided for the given {@param remoteLogSegmentMetadata}.
     * <p>
     * Invoker of this API should always send a unique id as part of {@link RemoteLogSegmentMetadata#remoteLogSegmentId()#id()}
     * even when it retries to invoke this method for the same log segment data.
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @param logSegmentData           data to be copied to tiered storage.
     * @throws RemoteStorageException if there are any errors in storing the data of the segment.
     */
    void copyLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata, LogSegmentData logSegmentData)
            throws RemoteStorageException;

    /**
     * Returns the remote log segment data file/object as InputStream for the given RemoteLogSegmentMetadata starting
     * from the given startPosition. The stream will end at the smaller of endPosition and the end of the remote log
     * segment data file/object.
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @param startPosition            start position of log segment to be read, inclusive.
     * @param endPosition              end position of log segment to be read, inclusive.
     * @return input stream of the requested log segment data.
     * @throws RemoteStorageException if there are any errors while fetching the desired segment.
     */
    InputStream fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
                                    Long startPosition, Long endPosition) throws RemoteStorageException;

    /**
     * Returns the offset index for the respective log segment of {@link RemoteLogSegmentMetadata}.
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @return input stream of the requested  offset index.
     * @throws RemoteStorageException if there are any errors while fetching the index.
     */
    InputStream fetchOffsetIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

    /**
     * Returns the timestamp index for the respective log segment of {@link RemoteLogSegmentMetadata}.
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @return input stream of the requested  timestamp index.
     * @throws RemoteStorageException if there are any errors while fetching the index.
     */
    InputStream fetchTimestampIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) listRemoteLogSegmentsthrows RemoteStorageException;

    /**
     * Returns the transaction index for the the respective log segment of {@link RemoteLogSegmentMetadata}.
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @return input stream of the requested  transaction index.
     * @throws RemoteStorageException if there are any errors while fetching the index.
     */
    default InputStream fetchTransactionIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

    /**
     * Returns the producer snapshot index for the the respective log segment of {@link RemoteLogSegmentMetadata}.
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @return input stream of the producer snapshot.
     * @throws RemoteStorageException if there are any errors while fetching the index.
     */
    InputStream fetchProducerSnapshotIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

    /**
     * Returns the leader epoch index for the the respective log segment of {@link RemoteLogSegmentMetadata}.
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @return input stream of the leader epoch index.
     * @throws RemoteStorageException if there are any errors while fetching the index.
     */
    InputStream fetchLeaderEpochIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

    /**
     * Deletes the resources associated with the given {@param remoteLogSegmentMetadata}. Deletion is considered as
     * successful if this call returns successfully without any errors. It will throw {@link RemoteStorageException} if
     * there are any errors in deleting the file.
     * <p>
     * {@link RemoteResourceNotFoundException} is thrown when there are no resources associated with the given
     * {@param remoteLogSegmentMetadata}.
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment to be deleted.
     * @throws RemoteResourceNotFoundException if the requested resource is not found
     * @throws RemoteStorageException          if there are any storage related errors occurred.
     */
    void deleteLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

}


package org.apache.kafka.common;
...
public class TopicIdPartition {

    private final UUID topicId;
    private final TopicPartition topicPartition;

    public TopicIdPartition(UUID topicId, TopicPartition topicPartition) {
        Objects.requireNonNull(topicId, "topicId can not be null");
        Objects.requireNonNull(topicPartition, "topicPartition can not be null");
        this.topicId = topicId;
        this.topicPartition = topicPartition;
    }

    public UUID topicId() {
        return topicId;
    }

    public TopicPartition topicPartition() {
        return topicPartition;
    }

...
}


package org.apache.kafka.common.log.remote.storage;
...
/**
 * This represents a universally unique identifier associated to a topic partition's log segment. This will be
 * regenerated for every attempt of copying a specific log segment in {@link RemoteStorageManager#copyLogSegment(RemoteLogSegmentMetadata, LogSegmentData)}.
 */
public class RemoteLogSegmentId implements Comparable<RemoteLogSegmentId>, Serializable {
    private static final long serialVersionUID = 1L;

    private final TopicIdPartition topicIdPartition;
    private final UUID id;

    public RemoteLogSegmentId(TopicIdPartition topicIdPartition, UUID id) {
        this.topicIdPartition = requireNonNull(topicIdPartition);
        this.id = requireNonNull(id);
    }

    /**
     * Returns TopicIdPartition of this remote log segment.
     * 
     * @return
     */
    public TopicIdPartition topicIdPartition() {
        return topicIdPartition;
    }

    /**
     * Returns Universally Unique Id of this remote log segment.
     *
     * @return
     */
    public UUID id() {
        return id;
    }
...
}

package org.apache.kafka.common.log.remote.storage;
...
/**
 * It describes the metadata about the log segment in the remote storage.
 */
public class RemoteLogSegmentMetadata implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * Universally unique remote log segment id.
     */
    private final RemoteLogSegmentId remoteLogSegmentId;

    /**
     * Start offset of this segment.
     */
    private final long startOffset;

    /**
     * End offset of this segment.
     */
    private final long endOffset;

    /**
     * Leader epoch of the broker.
     */
    private final int leaderEpoch;

    /**
     * Maximum timestamp in the segment
     */
    private final long maxTimestamp;

    /**
     * Epoch time at which the respective {@link #state} is set.
     */
    private final long eventTimestamp;

    /**
     * LeaderEpoch vs offset for messages with in this segment.
     */
    private final Map<Int, Long> segmentLeaderEpochs;

    /**
     * Size of the segment in bytes.
     */
    private final int segmentSizeInBytes;

    /**
     * It indicates the state in which the action is executed on this segment.
     */
    private final RemoteLogState state;

    /**
     * @param remoteLogSegmentId  Universally unique remote log segment id.
     * @param startOffset         Start offset of this segment.
     * @param endOffset           End offset of this segment.
     * @param maxTimestamp        maximum timestamp in this segment
     * @param leaderEpoch         Leader epoch of the broker.
     * @param eventTimestamp    Epoch time at which the remote log segment is copied to the remote tier storage.
     * @param segmentSizeInBytes  size of this segment in bytes.
     * @param state   The respective segment of remoteLogSegmentId is marked fro deletion.
     * @param segmentLeaderEpochs leader epochs occurred with in this segment
     */
    public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, long startOffset, long endOffset,
                                    long maxTimestamp, int leaderEpoch, long eventTimestamp,
                                    int segmentSizeInBytes, RemoteLogState state, Map<Int, Long> segmentLeaderEpochs) {
        this.remoteLogSegmentId = remoteLogSegmentId;
        this.startOffset = startOffset;
        this.endOffset = endOffset;
        this.leaderEpoch = leaderEpoch;
        this.maxTimestamp = maxTimestamp;
        this.eventTimestamp = eventTimestamp;
        this.segmentLeaderEpochs = segmentLeaderEpochs;
        this.state = state;
        this.segmentSizeInBytes = segmentSizeInBytes;
    }

    /**
     * @return unique id of this segment.
     */
    public RemoteLogSegmentId remoteLogSegmentId() {
        return remoteLogSegmentId;
    }

    /**
     * @return Start offset of this segment(inclusive).
     */
    public long startOffset() {
        return startOffset;
    }

    /**
     * @return End offset of this segment(inclusive).
     */
    public long endOffset() {
        return endOffset;
    }

    /**
     * @return Leader or controller epoch of the broker from where this event occurred.
     */
    public int brokerEpoch() {
        return brokerEpoch;
    }

    /**
     * @return Epoch time at which this evcent is occurred.
     */
    public long eventTimestamp() {
        return eventTimestamp;
    }

    /**
     * @return
     */
    public int segmentSizeInBytes() {
        return segmentSizeInBytes;
    }

    public RemoteLogState state() {
        return state;
    }

    public long maxTimestamp() {
        return maxTimestamp;
    }

    public Map<Int, Long> segmentLeaderEpochs() {
        return segmentLeaderEpochs;
    }

...
}

package org.apache.kafka.common.log.remote.storage;
...
public class LogSegmentData {

    private final File logSegment;
    private final File offsetIndex;
    private final File timeIndex;
    private final File txnIndex;
    private final File producerIdSnapshotIndex;
    private final ByteBuffer leaderEpochIndex;

    public LogSegmentData(File logSegment, File offsetIndex, File timeIndex, File txnIndex, File producerIdSnapshotIndex,
                          ByteBuffer leaderEpochIndex) {
        this.logSegment = logSegment;
        this.offsetIndex = offsetIndex;
        this.timeIndex = timeIndex;
        this.txnIndex = txnIndex;
        this.producerIdSnapshotIndex = producerIdSnapshotIndex;
        this.leaderEpochIndex = leaderEpochIndex;
    }

    public File logSegment() {
        return logSegment;
    }

    public File offsetIndex() {
        return offsetIndex;
    }

    public File timeIndex() {
        return timeIndex;
    }

    public File txnIndex() {
        return txnIndex;
    }

    public File producerIdSnapshotIndex() {
        return producerIdSnapshotIndex;
    }

    public ByteBuffer leaderEpochIndex() {
        return leaderEpochIndex;
    }
...
}

...