THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.server.log.remote.storage; ... /** * This interface provides storing and fetching remote log segment metadata with strongly consistent semantics. * <p> * This class can be plugged in to Kafka cluster by adding the implementation class as * <code>remote.log.metadata.manager.class.name</code> property value. There is an inbuilt implementation backed by * topic storage in the local cluster. This is used as the default implementation if * remote.log.metadata.manager.class.name is not configured. * </p> * <p> * <code>remote.log.metadata.manager.class.path</code> property is about the class path of the RemoteLogStorageManager * implementation. If specified, the RemoteLogStorageManager implementation and its dependent libraries will be loaded * by a dedicated classloader which searches this class path before the Kafka broker class path. The syntax of this * parameter is same with the standard Java class path string. * </p> * <p> * <code>remote.log.metadata.manager.listener.name</code> property is about listener name of the local broker to which * it should get connected if needed by RemoteLogMetadataManager implementation. When this is configured all other * required properties can be passed as properties with prefix of 'remote.log.metadata.manager.listener. * </p> * "cluster.id", "broker.id" and all the properties prefixed with "remote.log.metadata." are passed when * {@link #configure(Map)} is invoked on this instance. * <p> * <p> * <p> * All these APIs are still evolving. * <p> */ @InterfaceStability.Unstable public interface RemoteLogMetadataManager extends Configurable, Closeable { /** * Stores RemoteLogSegmentMetadata with the containing RemoteLogSegmentId into RemoteLogMetadataManager. * <p> * RemoteLogSegmentMetadata is identified by RemoteLogSegmentId. * * @param remoteLogSegmentMetadata metadata about the remote log segment to be deleted. * @throws RemoteStorageException if there are any storage related errors occurred. */ void putRemoteLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException; /** * RemoteLogSegmentMetadata is updated with the new state based on the life cycle of the segment. It can go through * the below state transitions. * <p> * <pre> * +---------------------+ +----------------------+ * |COPY_SEGMENT_STARTED |----------->|COPY_SEGMENT FINISHED | * +-------------------+-+ +--+-------------------+ * | | * | | * v v * +--+-----------------+-+ * |DELETE_SEGMENT_STARTED| * +-----------+----------+ * | * | * v * +-----------+-----------+ * |DELETE SEGMENT FINISHED| * +-----------------------+ * </pre> * <p> * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED} - This state indicates that the segment copying to remote storage is started but not yet finished. * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED} - This state indicates that the segment copying to remote storage is finished. * <br> * The leader broker copies the log segments to the remote storage and puts the remote log segment metadata with the * state as “COPY_SEGMENT_STARTED” and updates the state as “COPY_SEGMENT_FINISHED” once the copy is successful. * <p></p> * {@link RemoteLogSegmentState#DELETE_SEGMENT_STARTED} - This state indicates that the segment deletion is started but not yet finished. * {@link RemoteLogSegmentState#DELETE_SEGMENT_FINISHED} - This state indicates that the segment is deleted successfully. * <br> * Leader partitions publish both the above delete segment events when remote log retention is reached for the respective segments. Remote Partition Removers also publish these events when a segment is deleted. * * @param remoteLogSegmentMetadataUpdate update of the remote log segment metadata. * @throws RemoteStorageException if there are any storage related errors occurred. */ void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate) throws RemoteStorageException; /** * Fetches RemoteLogSegmentMetadata if it exists for the given topic partition containing offset and leader-epoch for the offset, * else returns {@link Optional#empty()}. * * @param topicIdPartition topic partition * @param offset offset * @param epochForOffset leader epoch for the given offset * @return the requested remote log segment metadata if it exists. * @throws RemoteStorageException if there are any storage related errors occurred. */ Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(TopicIdPartition topicIdPartition, long offset, int epochForOffset) throws RemoteStorageException; /** * Returns highest log offset of topic partition for the given leader epoch in remote storage. This is used by * remote log management subsystem to know upto which offset the segments have been copied to remote storage for * a given leader epoch. * * @param topicIdPartition topic partition * @param leaderEpoch leader epoch * @return the requested highest log offset if exists. * @throws RemoteStorageException if there are any storage related errors occurred. */ Optional<Long> highestLogOffset(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException; /** * Update the delete partition state of a topic partition in metadata storage. Controller invokes this method with * DeletePartitionUpdate having state as {@link RemoteLogSegmentState#DELETE_PARTITION_MARKED}. So, remote logpartition removers cleaners * can act on this event to clean the respective remote log segments of the partition. * <p><br> * Incase of default RLMM implementation, remote logpartition cleanerremover processes {@link RemoteLogSegmentState#DELETE_PARTITION_MARKED} * <ul> * <li> sends an event with state as {@link RemoteLogSegmentState#DELETE_PARTITION_STARTED} * <li> gets all the remote log segments and deletes them. * <li> sends an event with state as {@link RemoteLogSegmentState#DELETE_PARTITION_FINISHED} once all the remote log segments are * deleted. * </ul> * @param deletePartitionUpdate update on delete state of a partition. * @throws RemoteStorageException if there are any storage related errors occurred. */ void updateDeletePartitionState(DeletePartitionUpdate deletePartitionUpdate) throws RemoteStorageException; /** * List the remote log segment metadata of the given topicIdPartition. * <p> * This is used when a topic partition is deleted or cleaning up segments based on the retention, to fetch all the * remote log segments for the given topic partition and delete them. * * @return Iterator of remote segments, sorted by baseOffset in ascending order. */ default Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition) { return listRemoteLogSegments(topicIdPartition, 0); } /** * Returns iterator of remote log segment metadata, sorted by {@link RemoteLogSegmentMetadata#startOffset()} in * ascending order which contains the given leader epoch. This is used by remote log retention management subsystem * to fetch the segment metadata for a given leader epoch. * * @param topicIdPartition topic partition * @param leaderEpoch leader epoch * @return Iterator of remote segments, sorted by baseOffset in ascending order. */ Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition, long leaderEpoch); /** * This method is invoked only when there are changes in leadership of the topic partitions that this broker is * responsible for. * * @param leaderPartitions partitions that have become leaders on this broker. * @param followerPartitions partitions that have become followers on this broker. */ void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions, Set<TopicIdPartition> followerPartitions); /** * This method is invoked only when the given topic partitions are stopped on this broker. This can happen when a * partition is emigrated to other broker or a partition is deleted. * * @param partitions topic partitions which have been stopped. */ void onStopPartitions(Set<TopicIdPartition> partitions); } package org.apache.kafka.server.log.remote.storage; ... /** * It describes the metadata about the log segment in the remote storage. */ public class RemoteLogSegmentMetadataUpdate implements Serializable { private static final long serialVersionUID = 1L; /** * Universally unique remote log segment id. */ private final RemoteLogSegmentId remoteLogSegmentId; /** * Epoch time at which the respective {@link #state} is set. */ private final long eventTimestamp; /** * Leader epoch of the broker from where this event occurred. */ private final int leaderEpoch; /** * It indicates the state in which the action is executed on this segment. */ private final RemoteLogSegmentState state; /** * @param remoteLogSegmentId Universally unique remote log segment id. * @param eventTimestamp Epoch time at which the remote log segment is copied to the remote tier storage. * @param leaderEpoch Leader epoch of the broker from where this event occurred. * @param state state of the remote log segment. */ public RemoteLogSegmentMetadataUpdate(RemoteLogSegmentId remoteLogSegmentId, long eventTimestamp, int leaderEpoch, RemoteLogSegmentState state) { this.remoteLogSegmentId = remoteLogSegmentId; this.eventTimestamp = eventTimestamp; this.leaderEpoch = leaderEpoch; this.state = state; } public RemoteLogSegmentId remoteLogSegmentId() { return remoteLogSegmentId; } public long createdTimestamp() { return eventTimestamp; } public RemoteLogSegmentState state() { return state; } public int leaderEpoch() { return leaderEpoch; } ... } package org.apache.kafka.server.log.remote.storage; ... /** * */ public class DeletePartitionUpdate { private final TopicIdPartition topicIdPartition; private final RemotePartitionDeleteState state; private final long eventTimestamp; private final int epoch; public DeletePartitionUpdate(TopicIdPartition topicIdPartition, RemotePartitionDeleteState state, long eventTimestamp, int epoch) { Objects.requireNonNull(topicIdPartition); Objects.requireNonNull(state); this.topicIdPartition = topicIdPartition; this.state = state; this.eventTimestamp = eventTimestamp; this.epoch = epoch; } public TopicIdPartition topicIdPartition() { return topicIdPartition; } public RemotePartitionDeleteState state() { return state; } public long eventTimestamp() { return eventTimestamp; } public int epoch() { return epoch; } ... } package org.apache.kafka.server.log.remote.storage; ... /** * It indicates the deletion state of the remote topic partition. This will be based on the action executed on this * partition by the remote log service implementation. * <p> */ public enum RemotePartitionDeleteState { /** * This is used when a topic/partition is deleted by controller. * This partition is marked for delete by controller. That means, all its remote log segments are eligible for * deletion so that remote partition removers can start deleting them. */ DELETE_PARTITION_MARKED((byte) 0), /** * This state indicates that the partition deletion is started but not yet finished. */ DELETE_PARTITION_STARTED((byte) 1), /** * This state indicates that the partition is deleted successfully. */ DELETE_PARTITION_FINISHED((byte) 2); private static final Map<Byte, RemotePartitionDeleteState> STATE_TYPES = Collections.unmodifiableMap( Arrays.stream(values()).collect(Collectors.toMap(RemotePartitionDeleteState::id, Function.identity()))); private final byte id; RemotePartitionDeleteState(byte id) { this.id = id; } public byte id() { return id; } public static RemotePartitionDeleteState forId(byte id) { return STATE_TYPES.get(id); } ... } package org.apache.kafka.server.log.remote.storage; ... /** * It indicates the state of the remote log segment. This will be based on the action executed on this * segment by the remote log service implementation. * <p> */ public enum RemoteLogSegmentState { /** * This state indicates that the segment copying to remote storage is started but not yet finished. */ COPY_SEGMENT_STARTED((byte) 0), /** * This state indicates that the segment copying to remote storage is finished. */ COPY_SEGMENT_FINISHED((byte) 1), /** * This state indicates that the segment deletion is started but not yet finished. */ DELETE_SEGMENT_STARTED((byte) 2), /** * This state indicates that the segment is deleted successfully. */ DELETE_SEGMENT_FINISHED((byte) 3), private static final Map<Byte, RemoteLogSegmentState> STATE_TYPES = Collections.unmodifiableMap( Arrays.stream(values()).collect(Collectors.toMap(RemoteLogSegmentState::id, Function.identity()))); private final byte id; RemoteLogSegmentState(byte id) { this.id = id; } public byte id() { return id; } public static RemoteLogSegmentState forId(byte id) { return STATE_TYPES.get(id); } ... } |
...