Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleRemoteStorageManager

/**
 * RemoteStorageManager provides the lifecycle of remote log segments whichthat includes copy, fetch, and delete operations.
 *
 * {@link RemoteLogMetadataManager} is responsible for storing and fetching metadata about the remote log segments in afrom remote
 * strongly consistent mannerstorage.
 * <p>
 * Each upload or copy of a segment is giveninitiated with a{@link RemoteLogSegmentMetadata} containing {@link RemoteLogSegmentId}
 * which is universally unique even for the
 * same topic partition and offsets.
 Once* the<p>
 copy* orRemoteLogSegmentMetadata uploadis isstored successful,in {@link RemoteLogSegmentMetadataRemoteLogMetadataManager} is
before *and createdafter withcopy/delete RemoteLogSegmentIdoperations andon
 other* logRemoteStorageManager segmentwith informationthe andrespective it is stored in {@link RemoteLogSegmentMetadata.State}. {@link RemoteLogMetadataManager}. is
 * Thisresponsible allowsfor RemoteStorageManagerstoring toand storefetching segmentsmetadata even about the remote log segments in a strongly consistent manner.
 * This allows RemoteStorageManager to store segments even in eventually consistent manner as the metadata is already
 * stored in a consistent store.
 * <p>
 * All these APIs are still experimentalevolving.
 */
@InterfaceStability.Unstable
public interface RemoteStorageManager extends Configurable, Closeable {
    InputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]);

    /**
     * Copies LogSegmentData provided for the given RemoteLogSegmentId{@param remoteLogSegmentMetadata}.
     * information about this copy operation. This can include path to the object in the store etc.<p>
     *
     * Invoker of this API should always send a unique id as part of {@link RemoteLogSegmentMetadata#remoteLogSegmentId#idRemoteLogSegmentMetadata#remoteLogSegmentId()#id()}
 even when it
  * even when *it retries to invoke this method for the same log segment data.
     *
     * @param remoteLogSegmentId
remoteLogSegmentMetadata metadata about the remote * @param logSegmentDatalog segment.
     * @return@param logSegmentData           data to be copied to tiered storage.
     * @throws IOException RemoteStorageException if there are any errors in storing the data of the segment.
     */
    void copyLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata, LogSegmentData logSegmentData)
            throws RemoteStorageException;

    /**
     * Returns the remote log segment data file/object as InputStream for the given RemoteLogSegmentMetadata starting
     * from the given startPosition. The stream will end at the smaller of endPosition and the end of the remote log
     * segment data file/object.
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @param startPosition
     * @param endPosition
     *start @return
position of log segment to *be @throwsread, IOExceptioninclusive.
     */
 @param endPosition  InputStream fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
          end position of log segment to be read, inclusive.
     * @return
     * @throws RemoteStorageException if there are any Longerrors startPosition,while Longfetching endPosition)the throwsdesired RemoteStorageException;segment.

     */
    InputStream fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
                                    Long startPosition, Long endPosition) throws RemoteStorageException;

    /**
     * Returns the offset index for the respective log segment of {@link RemoteLogSegmentMetadata}.
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @return
     * @throws IOException RemoteStorageException if there are any errors while fetching the index.
     */
    InputStream fetchOffsetIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

    /**
     * Returns the timestamp index for the respective log segment of {@link RemoteLogSegmentMetadata}.
     *
     * @param remoteLogSegmentMetadata metadata about the remote log segment.
     * @return
     * @throws IOExceptionRemoteStorageException if there are any errors while fetching the index.
     */
    InputStream fetchTimestampIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

    /**
     *
 Returns the transaction index *for @paramthe remoteLogSegmentMetadata
the respective log segment of * @return{@link RemoteLogSegmentMetadata}.
     * @throws RemoteStorageException
     */
    default InputStream fetchTransactionIndex(RemoteLogSegmentMetadata @param remoteLogSegmentMetadata) throwsmetadata RemoteStorageExceptionabout {
the remote       throw new UnsupportedOperationException();
    }

log segment.
      /**
     * @return
     * @param@throws remoteLogSegmentMetadata
RemoteStorageException if there are any *errors @return
while fetching    * @throws RemoteStorageExceptionthe index.
     */
    default InputStream fetchProducerSnapshotIndexfetchTransactionIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
        throw new UnsupportedOperationException()return EMPTY_INPUT_STREAM;
    }

    /**
     * Returns the leaderproducer epochsnapshot index tillfor the the respective log segment of {@link RemoteLogSegmentMetadata}.
     *
     * @param remoteLogSegmentMetadata
 metadata about the remote *log @returnsegment.
     * @throws RemoteStorageException@return
     */
 @throws RemoteStorageException if default InputStream fetchLeaderEpochIndexthere are any errors while fetching the index.
     */
    default InputStream fetchProducerSnapshotIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
        throw new UnsupportedOperationException()return EMPTY_INPUT_STREAM;
    }

    /**
     * DeletesReturns the remoteleader logepoch segmentindex for the giventhe remoteLogSegmentMetadata.respective Deletionlog issegment consideredof as successful if{@link RemoteLogSegmentMetadata}.
     *
 this call returns successfully without* any@param exceptionsremoteLogSegmentMetadata aftermetadata aabout fewthe retries.remote Itlog willsegment.
 throw {@link RemoteStorageException} if there* are@return
     * @throws RemoteStorageException if there are any errors inwhile deletingfetching the fileindex.
     * /
    default * todo: Add retry configuration.InputStream fetchLeaderEpochIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
     *
   return EMPTY_INPUT_STREAM;
  * @param remoteLogSegmentMetadata}

    /**
 * @throws IOException
  * Deletes the */
resources associated with the void deleteLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

}



/**
 * This represents a universally unique id associated to a topic partition's log segment. This will be regenerated for
 * every attempt of copying a specific log segment in {@link RemoteLogStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)}.
 */
public class RemoteLogSegmentId {
    private TopicPartition topicPartition;
    private UUID id;

    public RemoteLogSegmentId(TopicPartition topicPartition, UUID id) {
        this.topicPartition = requireNonNull(topicPartition);given {@param remoteLogSegmentMetadata}. Deletion is considered as
     * successful if this call returns successfully without any errors. It will throw {@link RemoteStorageException} if
     * there are any errors in deleting the file.
     * <p>
     * {@link RemoteResourceNotFoundException} is thrown when there are no resources associated with the given
     * {@param remoteLogSegmentMetadata}.
     *
     * @param remoteLogSegmentMetadata this.idmetadata = requireNonNull(id);
    }

    public TopicPartition topicPartition() {
        return topicPartition;about the remote log segment to be deleted.
     * @throws RemoteResourceNotFoundException if the requested resource is not found
    }

 * @throws RemoteStorageException public UUID id() {
      if there returnare id;
any storage related errors }
occurred...
}



public class LogSegmentData {

     */
    private final File logSegment;
    private final File offsetIndex;
    private final File timeIndex;
    private final File txnIndex;
    private final File producerIdSnapshotIndex;
    private final File leaderEpochIndex;

    public LogSegmentData(File logSegment, File offsetIndex, File timeIndex, File txnIndex, File producerIdSnapshotIndex,
         void deleteLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

}



/**
 * This represents a universally unique id associated to a topic partition's log segment. This will be regenerated for
 * every attempt of copying a specific log segment in {@link RemoteLogStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)}.
 */
public class RemoteLogSegmentId {
    private TopicPartition topicPartition;
    private UUID id;

    public RemoteLogSegmentId(TopicPartition topicPartition, UUID id) {
        this.topicPartition = requireNonNull(topicPartition);
        this.id File= leaderEpochIndexrequireNonNull(id);
 {
   }

    public this.logSegment = logSegment;TopicPartition topicPartition() {
        this.offsetIndex = offsetIndexreturn topicPartition;
    }

    this.timeIndex = timeIndex;
public UUID id() {
         this.txnIndex = txnIndex;
        this.producerIdSnapshotIndex = producerIdSnapshotIndex;
        this.leaderEpochIndex = leaderEpochIndex;
    }
...
}

RemoteLogMetadataManager

`RemoteLogMetadataManager` is an interface to provide the lifecycle of metadata about remote log segments with strongly consistent semantics. There is a default implementation that uses an internal topic. Users can plugin their own implementation if they intend to use another system to store remote log segment metadata.

Code Block
languagejava
titleRemoteLogMetadataManager

/**
 * This interface provides storing and fetching remote log segment metadata with strongly consistent semantics.
 * <p>
 * When {@link #configure(Map)} is invoked on this instance, {@link #BROKER_ID}, {@link #CLUSTER_ID} properties are
 * passed which can be used by this instance if needed. These propertiess can be used if there is a single storage used
 * for different clusters. For ex: MySQL storage can be used as metadata store for all the clusters across the org.
 * <p>
 * 
 * todo-tier cleanup the abstractions in this interface.
 */
@InterfaceStability.Unstable
public interface RemoteLogMetadataManager extends Configurable, Closeable {

    /**
     * Property name for broker id.
     */
    String BROKER_ID = "broker.id"return id;
    }
...
}


/**
 * It describes the metadata about the log segment in the remote storage.
 */
public class RemoteLogSegmentMetadata implements Serializable {

    /**
     * It indicates the state of the remote log segment. This will be based on the action executed on this segment by
     * remote log service implementation.
     *
     * todo: check whether the state validations to be checked or not, add next possible states for each state.
     */
    public enum State {

        /**
         * This state indicates that the segment copying to remote storage is started but not yet finished.
         */
        COPY_STARTED(),

        /**
         * This state indicates that the segment copying to remote storage is finished.
         */
        COPY_FINISHED(),

        /**
         * This segment is marked for delete. That means, it is eligible for deletion. This is used when a topic/partition
         * is deleted so that deletion agents can start deleting them as the leader/follower does not exist.
         */
        DELETE_MARKED(),

        /**
         * This state indicates that the segment deletion is started but not yet finished.
         */
        DELETE_STARTED(),

        /**
         * This state indicates that the segment is deleted successfully.
         */
        DELETE_FINISHED();
    }

    private static final long serialVersionUID = 1L;

    /**
     * PropertyUniversally unique nameremote forlog clustersegment id.
     */
    Stringprivate CLUSTER_IDfinal = "cluster.id"RemoteLogSegmentId remoteLogSegmentId;

    /**
     * StoresStart RemoteLogSegmentMetadataoffset withof the containing RemoteLogSegmentId into RemoteLogMetadataManagerthis segment.
     */
    private *final RemoteLogSegmentMetadata is identified by RemoteLogSegmentId.long startOffset;

     /**
     * @paramEnd remoteLogSegmentMetadata
offset of    * @throws RemoteStorageExceptionthis segment.
     */
    voidprivate putRemoteLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageExceptionfinal long endOffset;

    /**
     * FetchesLeader RemoteLogSegmentIdepoch forof the given topic partition which contains the given offsetbroker.
     */
    private final * This will evolve to refactor TopicPartition to TopicPartitionId which contains a unique identifier and TopicPartition.int leaderEpoch;

    /**
     * Maximum timestamp in the segment
     */
    private *final @paramlong topicPartitionmaxTimestamp;

     /**
 @param offset
   * Epoch *time @return
at which the respective {@link *#state} @throwsis RemoteStorageExceptionset.
     */
    RemoteLogSegmentIdprivate remoteLogSegmentId(TopicPartition topicPartition, long offset) throws RemoteStorageExceptionfinal long eventTimestamp;

    /**
     * LeaderEpoch Fetchesvs RemoteLogSegmentMetadataoffset for themessages givenwith topicin partitionthis and offsetsegment.
     */
     * This will evolve to refactor TopicPartition to TopicPartitionId which contains a unique identifier and TopicPartitionprivate final Map<Long, Long> segmentLeaderEpochs;

    /**
     * Size of the segment in bytes.
     */
    private *final @paramlong topicPartitionsegmentSizeInBytes;

    /**
   * @param offset
* It indicates the state *in @return
which the action is executed *on @throwsthis RemoteStorageExceptionsegment.
     */
    RemoteLogSegmentMetadataprivate remoteLogSegmentMetadata(TopicPartition topicPartition, long offset) throws RemoteStorageExceptionfinal State state;

    /**
     * Returns@param earliestremoteLogSegmentId log offsetUniversally if there are segments in the unique remote storagelog for the given topic partition, else
     * returns {@link Optional#empty()}segment id.
     *
 @param startOffset   * This is treated as the effectiveStart log-start-offset of the topic partition's log.
     *
this segment.
     * @param endOffset       * todo check whether weEnd needoffset toof passthis leader-epochsegment.
     *
 @param maxTimestamp    * @param tp
  maximum timestamp in *this @returnsegment
     */
 @param leaderEpoch     Optional<Long> earliestLogOffset(TopicPartition tp) throws RemoteStorageException;

Leader epoch of the /**broker.
     * Returns@param highesteventTimestamp log offset of topicEpoch partitiontime inat remotewhich storage.
the remote log segment is *
copied to the remote  * @param tptier storage.
     * @param @return
segmentSizeInBytes  size of this *segment @throwsin RemoteStorageExceptionbytes.
     */
 @param state   Optional<Long>The highestLogOffset(TopicPartition tp) throws RemoteStorageException;

    /**respective segment of remoteLogSegmentId is marked fro deletion.
     * Deletes@param thesegmentLeaderEpochs logleader segmentepochs metadataoccurred forwith thein giventhis remoteLogSegmentId.segment
     */
    public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, long *startOffset, @paramlong remoteLogSegmentIdendOffset,
     * @throws RemoteStorageException
     */
    void deleteRemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId) throws RemoteStorageException;

    /**
     * List the remote log segment files oflong themaxTimestamp, given topicPartition.
  int leaderEpoch, long eventTimestamp,
   * The RemoteLogManager of a follower uses this method to find out the remote data for the given topic partition.
     * <p>
     * This is usedlong insegmentSizeInBytes, whileState deletingstate, aMap<Long, givenLong> topicsegmentLeaderEpochs) partition{
 to fetch all the remote log segments forthis.remoteLogSegmentId the= givenremoteLogSegmentId;
  topic
     * partitionthis.startOffset and= setstartOffset;
 a tombstone marker for them to be deletedthis.
endOffset = endOffset;
   *
     *this.leaderEpoch @return= ListleaderEpoch;
 of remote segments, sorted by baseOffset in ascending orderthis.
maxTimestamp = maxTimestamp;
   */
    default List<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicPartition topicPartition) {this.eventTimestamp = eventTimestamp;
        return listRemoteLogSegments(topicPartition, 0)this.segmentLeaderEpochs = segmentLeaderEpochs;
    }

    this.state = /**state;
     * Returns list ofthis.segmentSizeInBytes remote segments, sorted by {@link RemoteLogSegmentMetadata#startOffset()} in ascending order
     * which are >= the given min Offset.
     *= segmentSizeInBytes;
    }
...
}

public class LogSegmentData {

    private final File logSegment;
    private final File offsetIndex;
    private *final @paramFile topicPartitiontimeIndex;
    private *final @paramFile minOffsettxnIndex;
    private *final @returnFile ListproducerIdSnapshotIndex;
 of remote segments, sortedprivate byfinal baseOffset in ascending order.
     */
    List<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicPartition topicPartition, long minOffset);

    /**
File leaderEpochIndex;

    public LogSegmentData(File logSegment, File offsetIndex, File timeIndex, File txnIndex, File producerIdSnapshotIndex,
         * This method is invoked only when there are changes in leadership of the topic partitions that thisFile brokerleaderEpochIndex) is{
     *  responsible forthis.
logSegment =  logSegment;
  *
     * @paramthis.offsetIndex leaderPartitions= offsetIndex;
  partitions that have become leaders on this broker..timeIndex = timeIndex;
     * @param followerPartitions partitionsthis.txnIndex that= havetxnIndex;
 become followers on this broker.
   this.producerIdSnapshotIndex = */producerIdSnapshotIndex;
      void onPartitionLeadershipChanges(Set<TopicPartition> leaderPartitions, Set<TopicPartition> followerPartitions);
 this.leaderEpochIndex = leaderEpochIndex;
    }
...
}

RemoteLogMetadataManager

`RemoteLogMetadataManager` is an interface to provide the lifecycle of metadata about remote log segments with strongly consistent semantics. There is a default implementation that uses an internal topic. Users can plugin their own implementation if they intend to use another system to store remote log segment metadata.


Code Block
languagejava
titleRemoteLogMetadataManager

/**
     * This methodinterface isprovides invokedstoring onlyand whenfetching theremote givenlog topicsegment partitionsmetadata arewith stoppedstrongly onconsistent this broker. This can happen when a
     * partition is emigrated to other broker or a partition is deleted.
     *
     * @param partitions
     */
    void onStopPartitions(Set<TopicPartition> partitions);

    /**
     * Callback to receive once server is started so that this class can run tasks which should be run only when the
     * server is started.
     */
    void onServerStarted();
}



/**
 * It describes the metadata about the log segment in the remote storage.
 */
public class RemoteLogSegmentMetadata implements Serializablesemantics.
 * <p>
 * When {@link #configure(Map)} is invoked on this instance, {@link #BROKER_ID}, {@link #CLUSTER_ID} properties are
 * passed which can be used by this instance if needed. These propertiess can be used if there is a single storage used
 * for different clusters. For ex: MySQL storage can be used as metadata store for all the clusters across the org.
 * <p>
 * 
 * todo-tier cleanup the abstractions in this interface.
 */
@InterfaceStability.Unstable
public interface RemoteLogMetadataManager extends Configurable, Closeable {

    /**
     * ItProperty indicatesname thefor statebroker ofid.
 the remote log segment. This*/
 will be based onString theBROKER_ID action executed on this segment by= "broker.id";

    /**
     * Property remotename logfor servicecluster implementationid.
     */
    String * todo: check whether the state validations to be checked or not, add next possible states for each stateCLUSTER_ID = "cluster.id";

    /**
     * Stores RemoteLogSegmentMetadata with the containing RemoteLogSegmentId into RemoteLogMetadataManager.
     */
    public enum* StateRemoteLogSegmentMetadata {

is identified by RemoteLogSegmentId.
     /**
     * @param remoteLogSegmentMetadata
  * This state indicates* that@throws theRemoteStorageException
 segment copying to remote storage*/
 is started but notvoid yet finished.
    putRemoteLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

     /**/
     * Fetches RemoteLogSegmentId  COPY_STARTED(),

        /**for the given topic partition which contains the given offset.
     *
     * This statewill indicatesevolve thatto therefactor segment copyingTopicPartition to remoteTopicPartitionId storagewhich iscontains finished.
a unique identifier and TopicPartition.
     */
     *   COPY_FINISHED(),

@param topicPartition
     * @param  /**offset
     * @return
   * This segment* is@throws markedRemoteStorageException
 for delete. That means, it*/
 is eligible for deletion. This is used when a topic/partitionRemoteLogSegmentId remoteLogSegmentId(TopicPartition topicPartition, long offset) throws RemoteStorageException;

         * is deleted so that deletion agents can start deleting them as the leader/follower does not exist.
/**
     * Fetches RemoteLogSegmentMetadata for the given topic partition and offset.
     *
     */
        DELETE_MARKED(),

    This will evolve to refactor TopicPartition to TopicPartitionId which contains a unique identifier and TopicPartition.
     /**
     * @param topicPartition
   * This state* indicates@param thatoffset
 the segment deletion is started* but@return
 not yet finished.
  * @throws RemoteStorageException
     */
    RemoteLogSegmentMetadata remoteLogSegmentMetadata(TopicPartition topicPartition, long DELETE_STARTED(),offset) throws RemoteStorageException;

    /**
     /**
 Returns earliest log offset if there are segments *in Thisthe stateremote indicatesstorage thatfor the segmentgiven istopic deletedpartition, successfully.else
     * returns   */{@link Optional#empty()}.
     *
   DELETE_FINISHED();
  * This }

is treated as the private static final long serialVersionUID = 1L;

effective log-start-offset of the topic partition's log.
     /**
     * todo check Universallywhether uniquewe remoteneed logto segmentpass idleader-epoch.
     */
    private final* RemoteLogSegmentId remoteLogSegmentId;

    /**@param tp
     * Start offset of this segment.@return
     */
    Optional<Long> private final long startOffsetearliestLogOffset(TopicPartition tp) throws RemoteStorageException;

    /**
     * EndReturns highest log offset of this segmenttopic partition in remote storage.
     */
    private final* long@param endOffset;tp

     /** @return
     * Leader epoch of the broker.@throws RemoteStorageException
     */
    Optional<Long> private final int leaderEpochhighestLogOffset(TopicPartition tp) throws RemoteStorageException;

    /**
     * MaximumDeletes timestampthe inlog the segment
 metadata for the given */
    private final long maxTimestamp;

 remoteLogSegmentId.
     /**
     * Epoch@param timeremoteLogSegmentId
 at which the respective {@link* #state} is set.@throws RemoteStorageException
     */
    private final long eventTimestampvoid deleteRemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId) throws RemoteStorageException;

    /**
     * List LeaderEpochthe vsremote offsetlog forsegment messagesfiles withof inthe thisgiven segmenttopicPartition.
     */
 The RemoteLogManager of privatea finalfollower Map<Long,uses Long> segmentLeaderEpochs;

    /**
     * Size of the segment in bytesthis method to find out the remote data for the given topic partition.
     */ <p>
    private final* long segmentSizeInBytes;

    /**
     * It indicates the state in which the action is executed on this segment.This is used in while deleting a given topic partition to fetch all the remote log segments for the given  topic
     */
 partition and set privatea finaltombstone State state;

    /**marker for them to be deleted.
     *
 @param remoteLogSegmentId  Universally unique* remote@return logList segmentof id.
remote segments, sorted by baseOffset *in @paramascending startOffsetorder.
     */
    Startdefault offsetList<RemoteLogSegmentMetadata> of this segment.
     * @param endOffsetlistRemoteLogSegments(TopicPartition topicPartition) {
        return listRemoteLogSegments(topicPartition, 0);
    End}

 offset of this segment./**
     * @paramReturns maxTimestamplist of remote segments, sorted by {@link  maximum timestamp RemoteLogSegmentMetadata#startOffset()} in thisascending segmentorder
     * @paramwhich leaderEpochare >= the given min Offset.
    Leader epoch of the broker. *
     * @param eventTimestamptopicPartition
    Epoch time* at@param whichminOffset
 the remote log segment is* copied@return toList theof remote tier storage.
     * @param segmentSizeInBytes  size of this segment in bytessegments, sorted by baseOffset in ascending order.
     */
 @param state  List<RemoteLogSegmentMetadata> The respective segment of remoteLogSegmentId is marked fro deletion.listRemoteLogSegments(TopicPartition topicPartition, long minOffset);

     /**
 @param segmentLeaderEpochs leader epochs occurred* withThis inmethod thisis segment
invoked only when there are */
changes in leadership of publicthe RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, long startOffset, long endOffset,topic partitions that this broker is
     * responsible for.
     *
     * @param leaderPartitions   partitions that have become leaders on this broker.
     * @param longfollowerPartitions maxTimestamp,partitions intthat leaderEpoch,have longbecome eventTimestamp,
followers on this broker.
     */
    void onPartitionLeadershipChanges(Set<TopicPartition> leaderPartitions, Set<TopicPartition> followerPartitions);

    /**
     * This method is invoked only when the given topic partitions longare segmentSizeInBytes,stopped Stateon state, Map<Long, Long> segmentLeaderEpochs) {
  this broker. This can happen when a
     * this.remoteLogSegmentIdpartition =is remoteLogSegmentId;
emigrated to other broker or a partition is thisdeleted.startOffset
 = startOffset;
   *
     this.endOffset* =@param endOffset;partitions
     */
    this.leaderEpoch = leaderEpochvoid onStopPartitions(Set<TopicPartition> partitions);

    /**
     this.maxTimestamp* =Callback maxTimestamp;
to receive once server is started so that this.eventTimestamp =class eventTimestamp;
can run tasks which should be run only this.segmentLeaderEpochs = segmentLeaderEpochs;when the
     * server is this.state = state;started.
     */
   this.segmentSizeInBytes =void segmentSizeInBytesonServerStarted();
    }
...
}

New metrics

The following new metrics will be added:

...