Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleRemoteStorageManager
/**
 * RemoteStorageManager provides the lifecycle of remote log segments which includes copy, fetch, and delete operations.
 *
 * {@link RemoteLogMetadataManager} is responsible for storing and fetching metadata about the remote log segments in a
 * strongly consistent manner.
 *
 * Each upload or copy of a segment is given with a {@link RemoteLogSegmentId} which is universally unique even for the
 * same topic partition and offsets. Once the copy or upload is successful, {@link RemoteLogSegmentMetadata} is
 * created with RemoteLogSegmentId and other log segment information and it is stored in {@link RemoteLogMetadataManager}.
 * This allows RemoteStorageManager to store segments even in eventually consistent manner as the metadata is already
 * stored in a consistent store.
 *
 * All these APIs are still experimental.
 */
@InterfaceStability.Unstable
public interface RemoteStorageManager extends Configurable, Closeable {

    /**
     * Copies LogSegmentData provided for the given RemoteLogSegmentId and returns any contextual.
     * information about this copy operation. This can include path to the object in the store etc.
     *
     * Invoker of this API should always send a unique id as part of {@link RemoteLogSegmentId#idRemoteLogSegmentMetadata#remoteLogSegmentId#id()} even when it
     * retries to invoke this method for the same log segment data.
     *
     * @param remoteLogSegmentId
     * @param logSegmentData
     * @return
     * @throws IOException
     */
    RemoteLogSegmentContextvoid copyLogSegment(RemoteLogSegmentIdRemoteLogSegmentMetadata remoteLogSegmentIdremoteLogSegmentMetadata, LogSegmentData logSegmentData)
            throws RemoteStorageException;

    /**
     * Returns the remote log segment data file/object as InputStream for the given RemoteLogSegmentMetadata starting
     * from the given startPosition. The stream will end at the smaller of endPosition and the end of the remote log
     * segment data file/object.
     *
     * @param remoteLogSegmentMetadata
     * @param startPosition
     * @param endPosition
     * @return
     * @throws IOException
     */
    InputStream fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
                                    Long startPosition, Long endPosition) throws RemoteStorageException;

    /**
     * Returns the offset index for the respective log segment of {@link RemoteLogSegmentMetadata}.
     *
     * @param remoteLogSegmentMetadata
     * @return
     * @throws IOException
     */
    InputStream fetchOffsetIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

    /**
     * Returns the timestamp index for the respective log segment of {@link RemoteLogSegmentMetadata}.
     *
     * @param remoteLogSegmentMetadata
     * @return
     * @throws IOException
     */
    InputStream fetchTimestampIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

    /**
     *
     * @param remoteLogSegmentMetadata
     * @return
     * @throws RemoteStorageException
     */
    default InputStream fetchTransactionIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
        throw new UnsupportedOperationException();
    }

    /**
     *
     * @param remoteLogSegmentMetadata
     * @return
     * @throws RemoteStorageException
     */
    default InputStream fetchProducerSnapshotIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
        throw new UnsupportedOperationException();
    }

    /**
     * Returns the leader epoch index till the the respective log segment of {@link RemoteLogSegmentMetadata}.
     *
     * @param remoteLogSegmentMetadata
     * @return
     * @throws RemoteStorageException
     */
    default InputStream fetchLeaderEpochIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
        throw new UnsupportedOperationException();
    }

    /**
     * Deletes the remote log segment for the given remoteLogSegmentMetadata. Deletion is considered as successful if
     * this call returns successfully without any exceptions after a few retries. It will throw {@link RemoteStorageException} if there are
     * any errors in deleting the file.
     *
     * Broker pushes an event to __delete_failed_remote_log_segments topic for failed segment deletions so that users
     * cantodo: doAdd theretry cleanup laterconfiguration.
     *
     * @param remoteLogSegmentMetadata
     * @throws IOException
     */
    void deleteLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

}



/**
 * This represents a universally unique id associated to a topic partition's log segment. This will be regenerated for
 * every attempt of copying a specific log segment in {@link RemoteLogStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)}.
 */
public class RemoteLogSegmentId {
    private TopicPartition topicPartition;
    private UUID id;

    public RemoteLogSegmentId(TopicPartition topicPartition, UUID id) {
        this.topicPartition = requireNonNull(topicPartition);
        this.id = requireNonNull(id);
    }

    public TopicPartition topicPartition() {
        return topicPartition;
    }

    public UUID id() {
        return id;
    }
...
}



public class LogSegmentData {

    private final File logSegment;
    private final File offsetIndex;
    private final File timeIndex;
    private final File txnIndex;
    private final File producerIdSnapshotIndex;
    private final File leaderEpochIndex;

    public LogSegmentData(File logSegment, File offsetIndex, File timeIndex, File txnIndex, File producerIdSnapshotIndex,
                          File leaderEpochIndex) {
        this.logSegment = logSegment;
        this.offsetIndex = offsetIndex;
        this.timeIndex = timeIndex;
        this.txnIndex = txnIndex;
        this.producerIdSnapshotIndex = producerIdSnapshotIndex;
        this.leaderEpochIndex = leaderEpochIndex;
    }
...
}

...

Code Block
languagejava
titleRemoteLogMetadataManager
/**
 * This interface provides storing and fetching remote log segment metadata with strongly consistent semantics.
 * <p>
 * When {@link #configure(Map)} is invoked on this instance, {@link #BROKER_ID}, {@link #CLUSTER_ID} properties are
 * passed which can be used by this instance if needed. These propertiess can be used if there is a single storage used
 * for different clusters. For ex: MySQL storage can be used as metadata store for all the clusters across the org.
 * <p>
 * 
 * todo-tier cleanup the abstractions in this interface.
 */
@InterfaceStability.Unstable
public interface RemoteLogMetadataManager extends Configurable, Closeable {

    /**
     * Property name for broker id.
     */
    String BROKER_ID = "broker.id";

    /**
     * Property name for cluster id.
     */
    String CLUSTER_ID = "cluster.id";

    /**
     * Stores RemoteLogSegmentMetadata with the containing RemoteLogSegmentId into RemoteLogMetadataManager.
     *
     * RemoteLogSegmentMetadata is identified by RemoteLogSegmentId.
     *
     * @param remoteLogSegmentMetadata
     * @throws RemoteStorageException
     */
    void putRemoteLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

    /**
     * Fetches RemoteLogSegmentId for the given topic partition which contains the given offset.
     *
     * This will evolve to refactor TopicPartition to TopicPartitionId which contains a unique identifier and TopicPartition.
     *
     * @param topicPartition
     * @param offset
     * @return
     * @throws RemoteStorageException
     */
    RemoteLogSegmentId getRemoteLogSegmentIdremoteLogSegmentId(TopicPartition topicPartition, long offset) throws RemoteStorageException;

    /**
     * Fetches RemoteLogSegmentMetadata for the given topic partition and offset.
     *
     * This will evolve to refactor TopicPartition to TopicPartitionId which contains a unique identifier and TopicPartition.
     *
     * @param topicPartition
     * @param offset
     * @return
     * @throws RemoteStorageException
     */
    RemoteLogSegmentMetadata getRemoteLogSegmentMetadataremoteLogSegmentMetadata(TopicPartition topicPartition, long offset) throws RemoteStorageException;

    /**
     * Returns earliest log offset if there are segments in the remote storage for the given topic partition, else
     * returns {@link Optional#empty()}.
     *
     * This is treated as the effective log-start-offset of the topic partition's log.
     *
     * todo check whether we need to pass leader-epoch.
     *
     * @param tp
     * @return
     */
    Optional<Long> earliestLogOffset(TopicPartition tp) throws RemoteStorageException;

    /**
     * Returns highest log offset of topic partition in remote storage.
     *
     * @param tp
     * @return
     * @throws RemoteStorageException
     */
    Optional<Long> highestLogOffset(TopicPartition tp) throws RemoteStorageException;

    
    
    /**
     * Deletes the log segment metadata for the given remoteLogSegmentId.
     *
     * @param remoteLogSegmentId
     * @throws RemoteStorageException
     */
    void deleteRemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId) throws RemoteStorageException;

    /**
     * List the remote log segment files of the given topicPartition.
     * The RemoteLogManager of a follower uses this method to find out the remote data for the given topic partition.
     * <p>
     * This is used in while deleting a given topic partition to fetch all the remote log segments for the given  topic
     * partition and set a tombstone marker for them to be deleted.
     *
     * @return List of remote segments, sorted by baseOffset in ascending order.
     */
    default List<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicPartition topicPartition) {
        return listRemoteLogSegments(topicPartition, 0);
    }

    /**
     * Returns list of remote segments, sorted by {@link RemoteLogSegmentMetadata#startOffset()} in ascending order
     * which are >= the given min Offset.
     *
     * @param topicPartition
     * @param minOffset
     * @return List of remote segments, sorted by baseOffset in ascending order.
     */
    List<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicPartition topicPartition, long minOffset);

    /**
     * This method is invoked only when there are changes in leadership of the topic partitions that this broker is
     * responsible for.
     *
     * @param leaderPartitions   partitions that have become leaders on this broker.
     * @param followerPartitions partitions that have become followers on this broker.
     */
    void onPartitionLeadershipChanges(Set<TopicPartition> leaderPartitions, Set<TopicPartition> followerPartitions);

    /**
     * This method is invoked only when the given topic partitions are stopped on this broker. This can happen when a
     * partition is emigrated to other broker or a partition is deleted.
     *
     * @param partitions
     */
    void onStopPartitions(Set<TopicPartition> partitions);

    /**
     * Callback to receive once server is started so that this class can run tasks which should be run only when the
     * server is started.
     */
    void onServerStarted(final String serverEndpoint);
}



/**
 * Metadata about the log segment stored in remote tier storage.
 */
public class RemoteLogSegmentMetadata implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * Universally unique remote log segment id.
     */
    private final RemoteLogSegmentId remoteLogSegmentId;

    /**
     * Start offset of this segment.
     */
    private final long startOffset;

    /**
     * End offset of this segment.
     */
    private final long endOffset;

    /**
     * Leader epoch of the broker.
     */
    private final int leaderEpoch;

    /**
     * Maximum timestamp in the segment
     */
    private final long maxTimestamp;

    /**
     * Epoch time at which the remote log segment is copied to the remote tier storage.
     */
    private final long createdTimestamp;

    /**
     * Size of the segment in bytes.
     */
    private final long segmentSizeInBytes;

    /**
     * It indicates that this is marked for deletion.
     */
    private final boolean markedForDeletion;

    /**
     * Any context returned by {@link RemoteStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)} for
     * the given remoteLogSegmentId
     */
    private final byte[] remoteLogSegmentContext;


    /**
     * @param remoteLogSegmentId      Universally unique remote log segment id.
     * @param startOffset             Start offset of this segment.
     * @param endOffset               End offset of this segment.
     * @param maxTimestamp            maximum timestamp in this segment
     * @param leaderEpoch             Leader epoch of the broker.
     * @param createdTimestamp        Epoch time at which the remote log segment is copied to the remote tier storage.
     * @param markedForDeletion       The respective segment of remoteLogSegmentId is marked fro deletion.
     * @param remoteLogSegmentContext Any context returned by {@link RemoteStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)}
     * @param segmentSizeInBytes      size of this segment in bytes.
     */
    public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, long startOffset, long endOffset,
                                    long maxTimestamp, int leaderEpoch, long createdTimestamp,
                                    boolean markedForDeletion, byte[] remoteLogSegmentContext, long segmentSizeInBytes) {
        this.remoteLogSegmentId = remoteLogSegmentId;
        this.startOffset = startOffset;
        this.endOffset = endOffset;
        this.leaderEpoch = leaderEpoch;
        this.maxTimestamp = maxTimestamp;
        this.createdTimestamp = createdTimestamp;
        this.markedForDeletion = markedForDeletion;
        this.remoteLogSegmentContext = remoteLogSegmentContext;
        this.segmentSizeInBytes = segmentSizeInBytes;
    }

...
}

...