THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * RemoteStorageManager provides the lifecycle of remote log segments that includes copy, fetch, and delete from remote * storage. * <p> * Each upload or copy of a segment is initiated with {@link RemoteLogSegmentMetadata} containing {@link RemoteLogSegmentId} * which is universally unique even for the same topic partition and offsets. * <p> * RemoteLogSegmentMetadata is stored in {@link RemoteLogMetadataManager} before and after copy/delete operations on * RemoteStorageManager with the respective {@link RemoteLogSegmentMetadata.State}. {@link RemoteLogMetadataManager} is * responsible for storing and fetching metadata about the remote log segments in a strongly consistent manner. * This allows RemoteStorageManager to store segments even in eventually consistent manner as the metadata is already * stored in a consistent store. * <p> * All these APIs are still evolving. */ @InterfaceStability.Unstable public interface RemoteStorageManager extends Configurable, Closeable { InputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]); /** * Copies LogSegmentData provided for the given {@param remoteLogSegmentMetadata}. * <p> * Invoker of this API should always send a unique id as part of {@link RemoteLogSegmentMetadata#remoteLogSegmentId()#id()} * even when it retries to invoke this method for the same log segment data. * * @param remoteLogSegmentMetadata metadata about the remote log segment. * @param logSegmentData data to be copied to tiered storage. * @throws RemoteStorageException if there are any errors in storing the data of the segment. */ void copyLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata, LogSegmentData logSegmentData) throws RemoteStorageException; /** * Returns the remote log segment data file/object as InputStream for the given RemoteLogSegmentMetadata starting * from the given startPosition. The stream will end at the smaller of endPosition and the end of the remote log * segment data file/object. * * @param remoteLogSegmentMetadata metadata about the remote log segment. * @param startPosition start position of log segment to be read, inclusive. * @param endPosition end position of log segment to be read, inclusive. * @return input stream of the requested log segment data. * @throws RemoteStorageException if there are any errors while fetching the desired segment. */ InputStream fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata, Long startPosition, Long endPosition) throws RemoteStorageException; /** * Returns the offset index for the respective log segment of {@link RemoteLogSegmentMetadata}. * * @param remoteLogSegmentMetadata metadata about the remote log segment. * @return input stream of the requested offset index. * @throws RemoteStorageException if there are any errors while fetching the index. */ InputStream fetchOffsetIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException; /** * Returns the timestamp index for the respective log segment of {@link RemoteLogSegmentMetadata}. * * @param remoteLogSegmentMetadata metadata about the remote log segment. * @return input stream of the requested timestamp index. * @throws RemoteStorageException if there are any errors while fetching the index. */ InputStream fetchTimestampIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException; /** * Returns the transaction index for the the respective log segment of {@link RemoteLogSegmentMetadata}. * * @param remoteLogSegmentMetadata metadata about the remote log segment. * @return input stream of the requested transaction index. * @throws RemoteStorageException if there are any errors while fetching the index. */ default InputStream fetchTransactionIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException { return EMPTY_INPUT_STREAM; } /** * Returns the producer snapshot index for the the respective log segment of {@link RemoteLogSegmentMetadata}. * * @param remoteLogSegmentMetadata metadata about the remote log segment. * @return input stream of the producer snapshot. * @throws RemoteStorageException if there are any errors while fetching the index. */ default InputStream fetchProducerSnapshotIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException { return EMPTY_INPUT_STREAM; } /** * Returns the leader epoch index for the the respective log segment of {@link RemoteLogSegmentMetadata}. * * @param remoteLogSegmentMetadata metadata about the remote log segment. * @return input stream of the leader epoch index. * @throws RemoteStorageException if there are any errors while fetching the index. */ default InputStream fetchLeaderEpochIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException { return EMPTY_INPUT_STREAM; } /** * Deletes the resources associated with the given {@param remoteLogSegmentMetadata}. Deletion is considered as * successful if this call returns successfully without any errors. It will throw {@link RemoteStorageException} if * there are any errors in deleting the file. * <p> * {@link RemoteResourceNotFoundException} is thrown when there are no resources associated with the given * {@param remoteLogSegmentMetadata}. * * @param remoteLogSegmentMetadata metadata about the remote log segment to be deleted. * @throws RemoteResourceNotFoundException if the requested resource is not found * @throws RemoteStorageException if there are any storage related errors occurred. */ void deleteLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException; } /** * This represents a universally unique id associated to a topic partition's log segment. This will be regenerated for * every attempt of copying a specific log segment in {@link RemoteLogStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)}. */ public class RemoteLogSegmentId { private TopicPartition topicPartition; private UUID id; public RemoteLogSegmentId(TopicPartition topicPartition, UUID id) { this.topicPartition = requireNonNull(topicPartition); this.id = requireNonNull(id); } public TopicPartition topicPartition() { return topicPartition; } public UUID id() { return id; } ... } /** * It describes the metadata about the log segment in the remote storage. */ public class RemoteLogSegmentMetadata implements Serializable { /** * It indicates the state of the remote log segment. This will be based on the action executed on this segment by * remote log service implementation. * * todo: check whether the state validations to be checked or not, add next possible states for each state. */ public enum State { /** * This state indicates that the segment copying to remote storage is started but not yet finished. */ COPY_STARTED(), /** * This state indicates that the segment copying to remote storage is finished. */ COPY_FINISHED(), /** * This segment is marked for delete. That means, it is eligible for deletion. This is used when a topic/partition * is deleted so that deletion agents can start deleting them as the leader/follower does not exist. */ DELETE_MARKED(), /** * This state indicates that the segment deletion is started but not yet finished. */ DELETE_STARTED(), /** * This state indicates that the segment is deleted successfully. */ DELETE_FINISHED(); } private static final long serialVersionUID = 1L; /** * Universally unique remote log segment id. */ private final RemoteLogSegmentId remoteLogSegmentId; /** * Start offset of this segment. */ private final long startOffset; /** * End offset of this segment. */ private final long endOffset; /** * Leader epoch of the broker. */ private final int leaderEpoch; /** * Maximum timestamp in the segment */ private final long maxTimestamp; /** * Epoch time at which the respective {@link #state} is set. */ private final long eventTimestamp; /** * LeaderEpoch vs offset for messages with in this segment. */ private final Map<Long, Long> segmentLeaderEpochs; /** * Size of the segment in bytes. */ private final long segmentSizeInBytes; /** * It indicates the state in which the action is executed on this segment. */ private final State state; /** * @param remoteLogSegmentId Universally unique remote log segment id. * @param startOffset Start offset of this segment. * @param endOffset End offset of this segment. * @param maxTimestamp maximum timestamp in this segment * @param leaderEpoch Leader epoch of the broker. * @param eventTimestamp Epoch time at which the remote log segment is copied to the remote tier storage. * @param segmentSizeInBytes size of this segment in bytes. * @param state The respective segment of remoteLogSegmentId is marked fro deletion. * @param segmentLeaderEpochs leader epochs occurred with in this segment */ public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, long startOffset, long endOffset, long maxTimestamp, int leaderEpoch, long eventTimestamp, long segmentSizeInBytes, State state, Map<Long, Long> segmentLeaderEpochs) { this.remoteLogSegmentId = remoteLogSegmentId; this.startOffset = startOffset; this.endOffset = endOffset; this.leaderEpoch = leaderEpoch; this.maxTimestamp = maxTimestamp; this.eventTimestamp = eventTimestamp; this.segmentLeaderEpochs = segmentLeaderEpochs; this.state = state; this.segmentSizeInBytes = segmentSizeInBytes; } ... } public class LogSegmentData { private final File logSegment; private final File offsetIndex; private final File timeIndex; private final File txnIndex; private final File producerIdSnapshotIndex; private final File leaderEpochIndex; public LogSegmentData(File logSegment, File offsetIndex, File timeIndex, File txnIndex, File producerIdSnapshotIndex, File leaderEpochIndex) { this.logSegment = logSegment; this.offsetIndex = offsetIndex; this.timeIndex = timeIndex; this.txnIndex = txnIndex; this.producerIdSnapshotIndex = producerIdSnapshotIndex; this.leaderEpochIndex = leaderEpochIndex; } ... } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * This interface provides storing and fetching remote log segment metadata with strongly consistent semantics. * <p> * "cluster.id", "broker.id" and all the properties prefixed with "remote.log.metadata." are This class can be plugged in to Kafka cluster by adding the implementation class as * <code>remote.log.metadata.manager.class.name</code> property value. There is an inbuilt implementation backed by * topic storage in the local cluster. This is used as the default implementation if * remote.log.metadata.manager.class.name is not configured. * </p> * <p> * <code>remote.log.metadata.manager.class.path</code> property is about the class path of the RemoteLogStorageManager * implementation. If specified, the RemoteLogStorageManager implementation and its dependent libraries will be loaded * by a dedicated classloader which searches this class path before the Kafka broker class path. The syntax of this * parameter is same with the standard Java class path string. * </p> * <p> * <code>remote.log.metadata.manager.listener.name</code> property is about listener name of the local broker to which * it should get connected if needed by RemoteLogMetadataManager implementation. When this is configured all other * required properties can be passed as properties with prefix of 'remote.log.metadata.manager.listener. * </p> * "cluster.id", "broker.id" and all the properties prefixed with "remote.log.metadata." are passed when * {@link #configure(Map)} is invoked on this instance,. * <p> * <p> * "cluster.id", "broker.id" properties can be used if there is a single storage used for different clusters. * For ex: MySQL storage can be used as metadata store for all the clusters across the org. * <p> <p> * All these APIs are still evolving. * <p> * We may refactor TopicPartition in the below APIs to an abstraction that contains a unique identifier * and TopicPartition. This will be done once unique identifier for a topic is introduced with * <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers">KIP-516</a> */ @InterfaceStability.Unstable public interface RemoteLogMetadataManager extends Configurable, Closeable { /** * Stores RemoteLogSegmentMetadata with the containing RemoteLogSegmentId into RemoteLogMetadataManager. * <p> * RemoteLogSegmentMetadata is identified by RemoteLogSegmentId. * * @param remoteLogSegmentMetadata metadata about the remote log segment to be deleted. * @throws RemoteStorageException if there are any storage related errors occurred. */ void putRemoteLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException; /** * Fetches RemoteLogSegmentMetadata for the given topic partition, containing offset and leader-epoch for the offset. * <p> * This will evolve to refactor TopicPartition to TopicPartitionId which contains a unique identifier and TopicPartition. * * @param topicPartition topic partition * @param offset offset * @param epochForOffset leader epoch for the given offset * @return the requested remote log segment metadata. * @throws RemoteStorageException if there are any storage related errors occurred. */ RemoteLogSegmentMetadata remoteLogSegmentMetadata(TopicPartition topicPartition, long offset, int epochForOffset) long offset, int epochForOffset) throws RemoteStorageException; /** * Returns earliest log offset if there are segments in the remote storage for the given topic partition and * leader epoch else returns {@link Optional#empty()}. * * @param topicPartition topic partition * @param leaderEpoch leader epoch * @return the earliest log offset if exists. */ Optional<Long> earliestLogOffset(TopicPartition topicPartition, int leaderEpoch) throws RemoteStorageException; /** * Returns highest log offset of topic partition for the given leader epoch in remote storage. * * @param topicPartition topic partition * @param leaderEpoch leader epoch * @return the requested highest log offset if exists. * @throws RemoteStorageException if there are any storage related errors occurred. */ Optional<Long> highestLogOffset(TopicPartition topicPartition, int leaderEpoch) throws RemoteStorageException; /** * Deletes the log segment metadata for the given remoteLogSegmentId. * * @param remoteLogSegmentId remoteLogSegmentMetadata remote log segment metadata to be deleted. * @throws RemoteStorageException if there are any storage related errors occurred. */ void deleteRemoteLogSegmentMetadata(RemoteLogSegmentIdRemoteLogSegmentMetadata remoteLogSegmentIdremoteLogSegmentMetadata) throws RemoteStorageException; /** * List the remote log segment files of the given topicPartition. * The RemoteLogManager of a follower uses this method to find out the remote data for the given topic partition. * <p> * This is used in while deleting a given topic partition to fetch all the remote log segments for the given topic * partition and set a tombstone marker for them to be deleted. * * @return Iterator of remote segments, sorted by baseOffset in ascending order. */ default Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicPartition topicPartition) { return listRemoteLogSegments(topicPartition, 0); } /** * Returns iterator of remote segments, sorted by {@link RemoteLogSegmentMetadata#startOffset()} in ascending order * which are >= the given min Offset. * * @param topicPartition topic partition * @param minOffset offset for which segment metadata is requested, inclusive, * @return Iterator of remote segments, sorted by baseOffset in ascending order. */ Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicPartition topicPartition, long minOffset); /** * This method is invoked only when there are changes in leadership of the topic partitions that this broker is * responsible for. * * @param leaderPartitions partitions that have become leaders on this broker. * @param followerPartitions partitions that have become followers on this broker. */ void onPartitionLeadershipChanges(Set<TopicPartition> leaderPartitions, Set<TopicPartition> followerPartitions); /** * This method is invoked only when the given topic partitions are stopped on this broker. This can happen when a * partition is emigrated to other broker or a partition is deleted. * * @param partitions topic partitions which have been stopped. */ void onStopPartitions(Set<TopicPartition> partitions); /** * Callback to receive once server is started so that this class can run tasks which should be run only when the * server is started. */ void onServerStarted(); } |
New metrics
The following new metrics will be added:
...