THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * RemoteStorageManager provides the lifecycle of remote log segments which includes copy, fetch, and delete operations. * * {@link RemoteLogMetadataManager} is responsible for storing and fetching metadata about the remote log segments in a * strongly consistent manner. * * Each upload or copy of a segment is given with a {@link RemoteLogSegmentId} which is universally unique even for the * same topic partition and offsets. Once the copy or upload is successful, {@link RemoteLogSegmentMetadata} is * created with RemoteLogSegmentId and other log segment information and it is stored in {@link RemoteLogMetadataManager}. * This allows RemoteStorageManager to store segments even in eventually consistent manner as the metadata is already * stored in a consistent store. * * All these APIs are still experimental. */ @InterfaceStability.Unstable public interface RemoteStorageManager extends Configurable, Closeable { /** * Copies LogSegmentData provided for the given RemoteLogSegmentId and returns any contextual. * information about this copy operation. This can include path to the object in the store etc. * * Invoker of this API should always send a unique id as part of {@link RemoteLogSegmentId#idRemoteLogSegmentMetadata#remoteLogSegmentId#id()} even when it * retries to invoke this method for the same log segment data. * * @param remoteLogSegmentId * @param logSegmentData * @return * @throws IOException */ RemoteLogSegmentContextvoid copyLogSegment(RemoteLogSegmentIdRemoteLogSegmentMetadata remoteLogSegmentIdremoteLogSegmentMetadata, LogSegmentData logSegmentData) throws RemoteStorageException; /** * Returns the remote log segment data file/object as InputStream for the given RemoteLogSegmentMetadata starting * from the given startPosition. The stream will end at the smaller of endPosition and the end of the remote log * segment data file/object. * * @param remoteLogSegmentMetadata * @param startPosition * @param endPosition * @return * @throws IOException */ InputStream fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata, Long startPosition, Long endPosition) throws RemoteStorageException; /** * Returns the offset index for the respective log segment of {@link RemoteLogSegmentMetadata}. * * @param remoteLogSegmentMetadata * @return * @throws IOException */ InputStream fetchOffsetIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException; /** * Returns the timestamp index for the respective log segment of {@link RemoteLogSegmentMetadata}. * * @param remoteLogSegmentMetadata * @return * @throws IOException */ InputStream fetchTimestampIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException; /** * * @param remoteLogSegmentMetadata * @return * @throws RemoteStorageException */ default InputStream fetchTransactionIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException { throw new UnsupportedOperationException(); } /** * * @param remoteLogSegmentMetadata * @return * @throws RemoteStorageException */ default InputStream fetchProducerSnapshotIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException { throw new UnsupportedOperationException(); } /** * Returns the leader epoch index till the the respective log segment of {@link RemoteLogSegmentMetadata}. * * @param remoteLogSegmentMetadata * @return * @throws RemoteStorageException */ default InputStream fetchLeaderEpochIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException { throw new UnsupportedOperationException(); } /** * Deletes the remote log segment for the given remoteLogSegmentMetadata. Deletion is considered as successful if * this call returns successfully without any exceptions after a few retries. It will throw {@link RemoteStorageException} if there are * any errors in deleting the file. * * Broker pushes an event to __delete_failed_remote_log_segments topic for failed segment deletions so that users * cantodo: doAdd theretry cleanup laterconfiguration. * * @param remoteLogSegmentMetadata * @throws IOException */ void deleteLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException; } /** * This represents a universally unique id associated to a topic partition's log segment. This will be regenerated for * every attempt of copying a specific log segment in {@link RemoteLogStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)}. */ public class RemoteLogSegmentId { private TopicPartition topicPartition; private UUID id; public RemoteLogSegmentId(TopicPartition topicPartition, UUID id) { this.topicPartition = requireNonNull(topicPartition); this.id = requireNonNull(id); } public TopicPartition topicPartition() { return topicPartition; } public UUID id() { return id; } ... } public class LogSegmentData { private final File logSegment; private final File offsetIndex; private final File timeIndex; private final File txnIndex; private final File producerIdSnapshotIndex; private final File leaderEpochIndex; public LogSegmentData(File logSegment, File offsetIndex, File timeIndex, File txnIndex, File producerIdSnapshotIndex, File leaderEpochIndex) { this.logSegment = logSegment; this.offsetIndex = offsetIndex; this.timeIndex = timeIndex; this.txnIndex = txnIndex; this.producerIdSnapshotIndex = producerIdSnapshotIndex; this.leaderEpochIndex = leaderEpochIndex; } ... } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * This interface provides storing and fetching remote log segment metadata with strongly consistent semantics. * <p> * When {@link #configure(Map)} is invoked on this instance, {@link #BROKER_ID}, {@link #CLUSTER_ID} properties are * passed which can be used by this instance if needed. These propertiess can be used if there is a single storage used * for different clusters. For ex: MySQL storage can be used as metadata store for all the clusters across the org. * <p> * * todo-tier cleanup the abstractions in this interface. */ @InterfaceStability.Unstable public interface RemoteLogMetadataManager extends Configurable, Closeable { /** * Property name for broker id. */ String BROKER_ID = "broker.id"; /** * Property name for cluster id. */ String CLUSTER_ID = "cluster.id"; /** * Stores RemoteLogSegmentMetadata with the containing RemoteLogSegmentId into RemoteLogMetadataManager. * * RemoteLogSegmentMetadata is identified by RemoteLogSegmentId. * * @param remoteLogSegmentMetadata * @throws RemoteStorageException */ void putRemoteLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException; /** * Fetches RemoteLogSegmentId for the given topic partition which contains the given offset. * * This will evolve to refactor TopicPartition to TopicPartitionId which contains a unique identifier and TopicPartition. * * @param topicPartition * @param offset * @return * @throws RemoteStorageException */ RemoteLogSegmentId getRemoteLogSegmentIdremoteLogSegmentId(TopicPartition topicPartition, long offset) throws RemoteStorageException; /** * Fetches RemoteLogSegmentMetadata for the given topic partition and offset. * * This will evolve to refactor TopicPartition to TopicPartitionId which contains a unique identifier and TopicPartition. * * @param topicPartition * @param offset * @return * @throws RemoteStorageException */ RemoteLogSegmentMetadata getRemoteLogSegmentMetadataremoteLogSegmentMetadata(TopicPartition topicPartition, long offset) throws RemoteStorageException; /** * Returns earliest log offset if there are segments in the remote storage for the given topic partition, else * returns {@link Optional#empty()}. * * This is treated as the effective log-start-offset of the topic partition's log. * * todo check whether we need to pass leader-epoch. * * @param tp * @return */ Optional<Long> earliestLogOffset(TopicPartition tp) throws RemoteStorageException; /** * Returns highest log offset of topic partition in remote storage. * * @param tp * @return * @throws RemoteStorageException */ Optional<Long> highestLogOffset(TopicPartition tp) throws RemoteStorageException; /** * Deletes the log segment metadata for the given remoteLogSegmentId. * * @param remoteLogSegmentId * @throws RemoteStorageException */ void deleteRemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId) throws RemoteStorageException; /** * List the remote log segment files of the given topicPartition. * The RemoteLogManager of a follower uses this method to find out the remote data for the given topic partition. * <p> * This is used in while deleting a given topic partition to fetch all the remote log segments for the given topic * partition and set a tombstone marker for them to be deleted. * * @return List of remote segments, sorted by baseOffset in ascending order. */ default List<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicPartition topicPartition) { return listRemoteLogSegments(topicPartition, 0); } /** * Returns list of remote segments, sorted by {@link RemoteLogSegmentMetadata#startOffset()} in ascending order * which are >= the given min Offset. * * @param topicPartition * @param minOffset * @return List of remote segments, sorted by baseOffset in ascending order. */ List<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicPartition topicPartition, long minOffset); /** * This method is invoked only when there are changes in leadership of the topic partitions that this broker is * responsible for. * * @param leaderPartitions partitions that have become leaders on this broker. * @param followerPartitions partitions that have become followers on this broker. */ void onPartitionLeadershipChanges(Set<TopicPartition> leaderPartitions, Set<TopicPartition> followerPartitions); /** * This method is invoked only when the given topic partitions are stopped on this broker. This can happen when a * partition is emigrated to other broker or a partition is deleted. * * @param partitions */ void onStopPartitions(Set<TopicPartition> partitions); /** * Callback to receive once server is started so that this class can run tasks which should be run only when the * server is started. */ void onServerStarted(final String serverEndpoint); } /** * Metadata about the log segment stored in remote tier storage. */ public class RemoteLogSegmentMetadata implements Serializable { private static final long serialVersionUID = 1L; /** * Universally unique remote log segment id. */ private final RemoteLogSegmentId remoteLogSegmentId; /** * Start offset of this segment. */ private final long startOffset; /** * End offset of this segment. */ private final long endOffset; /** * Leader epoch of the broker. */ private final int leaderEpoch; /** * Maximum timestamp in the segment */ private final long maxTimestamp; /** * Epoch time at which the remote log segment is copied to the remote tier storage. */ private final long createdTimestamp; /** * Size of the segment in bytes. */ private final long segmentSizeInBytes; /** * It indicates that this is marked for deletion. */ private final boolean markedForDeletion; /** * Any context returned by {@link RemoteStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)} for * the given remoteLogSegmentId */ private final byte[] remoteLogSegmentContext; /** * @param remoteLogSegmentId Universally unique remote log segment id. * @param startOffset Start offset of this segment. * @param endOffset End offset of this segment. * @param maxTimestamp maximum timestamp in this segment * @param leaderEpoch Leader epoch of the broker. * @param createdTimestamp Epoch time at which the remote log segment is copied to the remote tier storage. * @param markedForDeletion The respective segment of remoteLogSegmentId is marked fro deletion. * @param remoteLogSegmentContext Any context returned by {@link RemoteStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)} * @param segmentSizeInBytes size of this segment in bytes. */ public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, long startOffset, long endOffset, long maxTimestamp, int leaderEpoch, long createdTimestamp, boolean markedForDeletion, byte[] remoteLogSegmentContext, long segmentSizeInBytes) { this.remoteLogSegmentId = remoteLogSegmentId; this.startOffset = startOffset; this.endOffset = endOffset; this.leaderEpoch = leaderEpoch; this.maxTimestamp = maxTimestamp; this.createdTimestamp = createdTimestamp; this.markedForDeletion = markedForDeletion; this.remoteLogSegmentContext = remoteLogSegmentContext; this.segmentSizeInBytes = segmentSizeInBytes; } ... } |
...