Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

[We will add more details later about how the resultant state for each topic partition is computed ]

New metrics

The following new metrics will be added:

...

kafka.log.remote:type=RemoteLogManagerMetrics, name=CopyToRemoteStorageBytesPerSec

...

Public Interfaces

Compacted topics will not have remote storage support. 

Configs

System-Wide

remote.log.storage.enable - Whether to enable remote log storage or not. Valid values are `true` or `false` and the default value is false. This property gives backward compatibility.

remote.log.storage.manager.class.name - This is mandatory if the remote.log.storage.enable is set as true.

remote.log.metadata.manager.class.name(optional) - This is an optional property. If this is not configured, Kafka uses an inbuilt metadata manager backed by an internal topic.

RemoteStorageManager

(These configs are dependent on remote storage manager implementation)

remote.log.storage.*

RemoteLogMetadataManager

(These configs are dependent on remote log metadata manager implementation)

remote.log.metadata.*

Thread pools

remote.log.manager.thread.pool.size
Remote log thread pool size, which is used in scheduling tasks to copy segments, fetch remote log indexes and clean up remote log segments.

remote.log.manager.task.interval.ms
The interval at which remote log manager runs the scheduled tasks like copy segments, fetch remote log indexes and clean up remote log segments.

remote.log.reader.threads
Remote log reader thread pool size

remote.log.reader.max.pending.tasks
Maximum remote log reader thread pool task queue size. If the task queue is full, broker will stop reading remote log segments.

Per Topic Configuration

remote.log.retention.minutes

remote.log.retention.bytes

Remote Storage Manager

         `RemoteStorageManager` is an interface to provide the lifecycle of remote log segments and indexes. More details about how we arrived at this interface are discussed in the document. We will provide a simple implementation of RSM to get a better understanding of the APIs. HDFS and S3 implementation are planned to be hosted in external repos and these will not be part of Apache Kafka repo. This is inline with the approach taken for Kafka connectors.


Code Block
languagejava
titleRemoteStorageManager
/**
 * RemoteStorageManager provides the lifecycle of remote log segments which includes copy, fetch, and delete operations.
 *
 * {@link RemoteLogMetadataManager} is responsible for storing and fetching metadata about the remote log segments in a
 * strongly consistent manner.
 *
 * Each upload or copy of a segment is given with a {@link RemoteLogSegmentId} which is universally unique even for the
 * same topic partition and offsets. Once the copy or upload is successful, {@link RemoteLogSegmentMetadata} is
 * created with RemoteLogSegmentId and other log segment information and it is stored in {@link RemoteLogMetadataManager}.
 * This allows RemoteStorageManager to store segments even in eventually consistent manner as the metadata is already
 * stored in a consistent store.
 *
 * All these APIs are still experimental.
 */
@InterfaceStability.Unstable
public interface RemoteStorageManager extends Configurable, Closeable {

    /**
     * Copies LogSegmentData provided for the given RemoteLogSegmentId and returns any contextual
     * information about this copy operation. This can include path to the object in the store etc.
     *
     * Invoker of this API should always send a unique id as part of {@link RemoteLogSegmentId#id()} even when it
     * retries to invoke this method for the same log segment data.
     *
     * @param remoteLogSegmentId
     * @param logSegmentData
     * @return
     * @throws IOException
     */
    RemoteLogSegmentContext copyLogSegment(RemoteLogSegmentId remoteLogSegmentId, LogSegmentData logSegmentData)
            throws RemoteStorageException;

Public Interfaces

Compacted topics will not have remote storage support. 

Configs

...

remote.log.storage.enable - Whether to enable remote log storage or not. Valid values are `true` or `false` and the default value is false. This property gives backward compatibility.

remote.log.storage.manager.class.name - This is mandatory if the remote.log.storage.enable is set as true.

remote.log.metadata.manager.class.name(optional) - This is an optional property. If this is not configured, Kafka uses an inbuilt metadata manager backed by an internal topic.

...

(These configs are dependent on remote storage manager implementation)

remote.log.storage.*

...

(These configs are dependent on remote log metadata manager implementation)

remote.log.metadata.*

...

remote.log.manager.thread.pool.size
Remote log thread pool size, which is used in scheduling tasks to copy segments, fetch remote log indexes and clean up remote log segments.

remote.log.manager.task.interval.ms
The interval at which remote log manager runs the scheduled tasks like copy segments, fetch remote log indexes and clean up remote log segments.

remote.log.reader.threads
Remote log reader thread pool size

remote.log.reader.max.pending.tasks
Maximum remote log reader thread pool task queue size. If the task queue is full, broker will stop reading remote log segments.

...

remote.log.retention.minutes

remote.log.retention.bytes

Remote Storage Manager

         `RemoteStorageManager` is an interface to provide the lifecycle of remote log segments and indexes. More details about how we arrived at this interface are discussed in the document. We will provide a simple implementation of RSM to get a better understanding of the APIs. HDFS and S3 implementation are planned to be hosted in external repos and these will not be part of Apache Kafka repo. This is inline with the approach taken for Kafka connectors.

Code Block
languagejava
titleRemoteStorageManager
/**
 * RemoteStorageManager provides the lifecycle of remote log segments which includes copy, fetch, and delete operations.
 *
 * {@link RemoteLogMetadataManager} is responsible for storing and fetching metadata about the remote log segments in a
 * strongly consistent manner.
 *
 * Each upload or copy of a segment is given with a {@link RemoteLogSegmentId} which is universally unique even for the
 * same topic partition and offsets. Once the copy or upload is successful, {@link RemoteLogSegmentMetadata} is
 * created with RemoteLogSegmentId and other log segment information and it is stored in {@link RemoteLogMetadataManager}.
 * This allows RemoteStorageManager to store segments even in eventually consistent manner as the metadata is already
 * stored in a consistent store.
 *
 * All these APIs are still experimental.
 */
@InterfaceStability.Unstable
public interface RemoteStorageManager extends Configurable, Closeable {

    /**
     * CopiesReturns LogSegmentData providedthe remote log segment data file/object as InputStream for the given RemoteLogSegmentId and returns any contextualRemoteLogSegmentMetadata starting
     * informationfrom aboutthe thisgiven copy operationstartPosition. ThisThe canstream includewill pathend toat the objectsmaller inof theendPosition storeand etc.
the end of the remote *log
     * Invokersegment of this API should always send a unique id as part of {@link RemoteLogSegmentId#id()} even when itdata file/object.
     *
     * @param remoteLogSegmentMetadata
     * retries@param tostartPosition
 invoke this method for the* same log segment data.@param endPosition
     * @return
     * @param@throws remoteLogSegmentIdIOException
     * @param logSegmentData/
    InputStream * @return
fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
      * @throws IOException
              */
    RemoteLogSegmentContext copyLogSegment(RemoteLogSegmentId remoteLogSegmentId, LogSegmentData logSegmentData)
        Long startPosition, Long endPosition) throws RemoteStorageException;

    /**
     * Returns the remoteoffset logindex segmentfor datathe file/objectrespective aslog InputStreamsegment for the givenof {@link RemoteLogSegmentMetadata starting}.
     *
 from the given startPosition. The* stream@param willremoteLogSegmentMetadata
 end at the smaller of* endPosition@return
 and the end of the* remote@throws logIOException
     */
 segment data file/object.
 InputStream fetchOffsetIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws *RemoteStorageException;

     * @param remoteLogSegmentMetadata/**
     * @paramReturns startPosition
the timestamp index for the respective log segment of {@link RemoteLogSegmentMetadata}.
     *
     * @param endPositionremoteLogSegmentMetadata
     * @return
     * @throws IOException
     */
    InputStream fetchLogSegmentDatafetchTimestampIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata,) throws RemoteStorageException;

    /**
     *
     * @param remoteLogSegmentMetadata
     * @return
     * @throws RemoteStorageException
     */
    Longdefault startPosition, Long endPositionInputStream fetchTransactionIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;
 {
    /**
    throw * Returns the offset index for the respective log segment of {@link RemoteLogSegmentMetadata}.new UnsupportedOperationException();
    }

    /**
     *
     * @param remoteLogSegmentMetadata
     * @return
     * @throws IOExceptionRemoteStorageException
     */
    default InputStream fetchOffsetIndexfetchProducerSnapshotIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;
 {
    /**
    throw * Returns the timestamp index for the respective log segment of {@link RemoteLogSegmentMetadata}.
     *
     * @param remoteLogSegmentMetadata
     * @returnnew UnsupportedOperationException();
    }

    /**
     * Deletes the remote log segment for the given remoteLogSegmentMetadata. Deletion is considered as successful if
     * @throwsthis IOException
call returns successfully without any */
exceptions. It will throw InputStream fetchTimestampIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

    /**{@link RemoteStorageException} if there are
     *
 any errors in deleting * @param remoteLogSegmentMetadatathe file.
     * @return
     * @throwsBroker RemoteStorageException
pushes an event to  */
    default InputStream fetchTransactionIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {__delete_failed_remote_log_segments topic for failed segment deletions so that users
     * can do throwthe new UnsupportedOperationException();
    }

cleanup later.
    /**
     *
     * @param remoteLogSegmentMetadata
     * @return
     * @throws RemoteStorageExceptionIOException
     */
    defaultvoid InputStream fetchProducerSnapshotIndexdeleteLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {;

}



/**
 * This represents a universally unique id associated throwto new UnsupportedOperationException();
    }

    /**
     * Deletes the remotea topic partition's log segment. This will be regenerated for
 * every attempt of copying a specific log segment forin the given remoteLogSegmentMetadata. Deletion is considered as successful if{@link RemoteLogStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)}.
 */
public class RemoteLogSegmentId {
    private TopicPartition topicPartition;
    private * this call returns successfully without any exceptions. It will throw {@link RemoteStorageException} if there are
UUID id;

    public RemoteLogSegmentId(TopicPartition topicPartition, UUID id) {
        this.topicPartition * any errors in deleting the file.
= requireNonNull(topicPartition);
        this.id = *requireNonNull(id);
    }

  * Broker pushespublic anTopicPartition event to __delete_failed_remote_log_segments topic for failed segment deletions so that users
 topicPartition() {
        return topicPartition;
    *}

 can do the cleanuppublic later.
  UUID id() {
   *
     * @param remoteLogSegmentMetadatareturn id;
     * @throws IOException
     */}
...
}



public class LogSegmentData {

    private final File logSegment;
    voidprivate deleteLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

}



/**
 * This represents a universally unique id associated to a topic partition's log segment. This will be regenerated for
 * every attempt of copying a specific log segment in {@link RemoteLogStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)}.
 */
public class RemoteLogSegmentId {
    private TopicPartition topicPartition;
    private UUID id;

    public RemoteLogSegmentId(TopicPartition topicPartition, UUID idfinal File offsetIndex;
    private final File timeIndex;
    private final File txnIndex;
    private final File producerIdSnapshotIndex;
    private final File leaderEpochIndex;

    public LogSegmentData(File logSegment, File offsetIndex, File timeIndex, File txnIndex, File producerIdSnapshotIndex,
                          File leaderEpochIndex) {
        this.topicPartitionlogSegment = requireNonNull(topicPartition)logSegment;
        this.idoffsetIndex = requireNonNull(id)offsetIndex;
    }

    publicthis.timeIndex TopicPartition topicPartition() {
= timeIndex;
        this.txnIndex return= topicPartitiontxnIndex;
    }

    publicthis.producerIdSnapshotIndex UUID id() {
= producerIdSnapshotIndex;
        this.leaderEpochIndex return= idleaderEpochIndex;
    }
...
}



public class LogSegmentData {

    private final File logSegment;
    private final File offsetIndex;
    private final File timeIndex;
    private final File txnIndex;
    private final File producerIdSnapshotIndex;
    private final File leaderEpochIndex;

    public LogSegmentData(File logSegment, File offsetIndex, File timeIndex, File txnIndex, File producerIdSnapshotIndex,
                          File leaderEpochIndex) {
        this.logSegment = logSegment;
        this.offsetIndex = offsetIndex;
        this.timeIndex = timeIndex;
        this.txnIndex = txnIndex;
        this.producerIdSnapshotIndex = producerIdSnapshotIndex;
        this.leaderEpochIndex = leaderEpochIndex;
    }
...
}

RemoteLogMetadataManager

`RemoteLogMetadataManager` is an interface to provide the lifecycle of metadata about remote log segments with strongly consistent semantics. There is a default implementation that uses an internal topic. Users can plugin their own implementation if they intend to use another system to store remote log segment metadata.

RemoteLogMetadataManager

`RemoteLogMetadataManager` is an interface to provide the lifecycle of metadata about remote log segments with strongly consistent semantics. There is a default implementation that uses an internal topic. Users can plugin their own implementation if they intend to use another system to store remote log segment metadata.


Code Block
languagejava
titleRemoteLogMetadataManager
/**
 * This interface provides storing and fetching remote log segment metadata with strongly consistent semantics.
 *
 * When {@link #configure(Map)} is invoked on this instance, {@link #BROKER_ID}, {@link #CLUSTER_ID} properties are
 * passed which can be used by this instance if needed. These props can be used if there is a single storage used for
 * different clusters. For ex: MySQL storage can be used as metadata store for all the clusters across the org.
 *
 * todo-tier cleanup the abstractions in this interface.
 */
@InterfaceStability.Unstable
public interface RemoteLogMetadataManager extends Configurable, Closeable {

    /**
     *
     */
    String BROKER_ID = "broker.id";

    /**
     *
     */
    String CLUSTER_ID = "cluster.id";

    /**
     * Stores RemoteLogSegmentMetadata with the given RemoteLogSegmentId.
     *
     * @param remoteLogSegmentMetadata
     * @throws IOException
     */
    void putRemoteLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException;

    /**
     * Fetches RemoteLogSegmentId for the given topic partition which contains the given offset.
     *
     * @param topicPartition
     * @param offset
     * @return
     * @throws IOException
     */
    RemoteLogSegmentId getRemoteLogSegmentId(TopicPartition topicPartition, long offset) throws IOException;

    /**
     * Fetches RemoteLogSegmentMetadata for the given RemoteLogSegmentId.
     *
     * @param remoteLogSegmentId
     * @return
     * @throws IOException
Code Block
languagejava
titleRemoteLogMetadataManager
/**
 * This interface provides storing and fetching remote log segment metadata with strongly consistent semantics.
 *
 * When {@link #configure(Map)} is invoked on this instance, {@link #BROKER_ID}, {@link #CLUSTER_ID} properties are
 * passed which can be used by this instance if needed. These props can be used if there is a single storage used for
 * different clusters. For ex: MySQL storage can be used as metadata store for all the clusters across the org.
 *
 * todo-tier cleanup the abstractions in this interface.
 */
@InterfaceStability.Unstable
public interface RemoteLogMetadataManager extends Configurable, Closeable {

    /**
     *
     */
    RemoteLogSegmentMetadata String BROKER_ID = "broker.id"getRemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId) throws IOException;

    /**
     *
 Earliest log offset  */
    String CLUSTER_ID = "cluster.id";

    /**if exists for the given topic partition in the remote storage. Return {@link Optional#empty()}
     * Stores RemoteLogSegmentMetadata with if there are no segments in the givenremote RemoteLogSegmentIdstorage.
     *
     * @param remoteLogSegmentMetadatatp
     * @throws IOException@return
     */
    voidOptional<Long> putRemoteLogSegmentDataearliestLogOffset(RemoteLogSegmentMetadataTopicPartition remoteLogSegmentMetadatatp) throws IOException;

    /**
     *
 Fetches  RemoteLogSegmentId for the* given@param topictp
 partition which contains the given* offset.@return
     * @throws IOException
     * @param topicPartition/
     * @param offset
     * @return
     * @throws IOException
     */
    RemoteLogSegmentId getRemoteLogSegmentId(TopicPartition topicPartition, long offsetOptional<Long> highestLogOffset(TopicPartition tp) throws IOException;

    /**
     * Fetches RemoteLogSegmentMetadata Deletes the log segment metadata for the given RemoteLogSegmentIdremoteLogSegmentId.
     *
     * @param remoteLogSegmentId
     * @return
     * @throws IOException
     */
    RemoteLogSegmentMetadatavoid getRemoteLogSegmentMetadatadeleteRemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId) throws IOException;

    /**
     * List the Earliestremote log offsetsegment iffiles exists forof the given topictopicPartition.
 partition in the remote storage.* ReturnThe {@link Optional#empty()}
     * if there are no segments in the remote storageRemoteLogManager of a follower uses this method to find out the remote data for the given topic partition.
     *
     * @param tp
     * @return@return List of remote segments, sorted by baseOffset in ascending order.
     */
    default Optional<Long>List<RemoteLogSegmentMetadata> earliestLogOffsetlistRemoteLogSegments(TopicPartition tptopicPartition) throws IOException;{
        return listRemoteLogSegments(topicPartition, 0);
    }

    /**
     * @param topicPartition
     * @param tpminOffset
     * @return
     * @throws IOException List of remote segments, sorted by baseOffset in ascending order.
     */
    Optional<Long>List<RemoteLogSegmentMetadata> highestLogOffsetlistRemoteLogSegments(TopicPartition tp)topicPartition, throwslong IOExceptionminOffset);

    /**
     * DeletesThis themethod logis segmentinvoked metadataonly forwhen thethere givenare remoteLogSegmentId.
changes in leadership of the *
topic partitions that this broker is
     * @paramresponsible remoteLogSegmentIdfor.
     * @throws IOException
     */
 @param leaderPartitions  void deleteRemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId) throws IOException;

    /**partitions that have become leaders on this broker.
     * List@param thefollowerPartitions remotepartitions logthat segmenthave filesbecome offollowers theon giventhis topicPartitionbroker.
     * The RemoteLogManager of a follower uses this method to find out the remote data for/
    void onPartitionLeadershipChanges(Set<TopicPartition> leaderPartitions, Set<TopicPartition> followerPartitions);

    /**
     * This method is invoked only when the given topic partition.
     * partitions are stopped on this broker. This can happen when a
     * @returnpartition Listis ofemigrated remoteto segments,other sortedbroker byor baseOffseta inpartition ascendingis orderdeleted.
     */
    default List<RemoteLogSegmentMetadata>* listRemoteLogSegments(TopicPartition topicPartition) {
@param partitions
     */
    returnvoid listRemoteLogSegmentsonStopPartitions(topicPartition,Set<TopicPartition> 0partitions);
    }

    /**
     * @param topicPartition
     * @param minOffset
     * @return List of remote segments, sorted by baseOffset in ascending order Callback to receive once server is started so that this class can run tasks which should be run only when the
     * server is started.
     */
    List<RemoteLogSegmentMetadata>void listRemoteLogSegmentsonServerStarted(TopicPartitionfinal topicPartition, long minOffsetString serverEndpoint);
}



    /**
 * Metadata about the *log Thissegment methodstored isin invokedremote onlytier when there are changes in leadership of the topic partitions that this broker is
     * responsible for.storage.
 */
public class RemoteLogSegmentMetadata implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     *
 Universally unique remote log segment *id.
 @param leaderPartitions   partitions*/
 that have become leadersprivate onfinal this broker.RemoteLogSegmentId remoteLogSegmentId;

     /**
 @param followerPartitions partitions that have* becomeStart followersoffset onof this brokersegment.
     */
    voidprivate onPartitionLeadershipChanges(Set<TopicPartition> leaderPartitions, Set<TopicPartition> followerPartitions)final long startOffset;

    /**
     * ThisEnd methodoffset isof invokedthis onlysegment.
 when the given  topic*/
 partitions are stopped onprivate thisfinal broker. This can happen when along endOffset;

    /**
     * partitionLeader isepoch emigratedof tothe other broker.
 or a partition is deleted.*/
    private final int leaderEpoch;

    /**
     * @param partitions Maximum timestamp in the segment
     */
    private voidfinal onStopPartitions(Set<TopicPartition> partitions)long maxTimestamp;

    /**
     * CallbackEpoch totime receiveat oncewhich serverthe isremote startedlog sosegment thatis thiscopied classto canthe runremote taskstier which should be run only when thestorage.
     */
    private * server is started.long createdTimestamp;

     /**/
    void onServerStarted(final String serverEndpoint);
}



/**
 *Size Metadata aboutof the log segment stored in remote tier storagebytes.
 */
public class RemoteLogSegmentMetadata implements Serializable {
 */
    private static final long serialVersionUID = 1LsegmentSizeInBytes;

    /**
     * Universally unique remote log segment idIt indicates that this is marked for deletion.
     */
    private finalboolean RemoteLogSegmentId remoteLogSegmentIdmarkedForDeletion;

    /**
     * Start offset of this segment. Any context returned by {@link RemoteStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)} for
     * the given remoteLogSegmentId
     */
    private final longbyte[] startOffsetremoteLogSegmentContext;


    /**
     * End offset of this segment@param remoteLogSegmentId      Universally unique remote log segment id.
     */
 @param   privatestartOffset final long endOffset;

    /**
     * LeaderStart epochoffset of thethis brokersegment.
     */
 @param endOffset  private final int leaderEpoch;

    /**
     * MaximumEnd timestampoffset inof thethis segment.
     */
 @param maxTimestamp  private final long maxTimestamp;

    /**
   maximum timestamp *in Epochthis timesegment
 at which the remote log* segment@param isleaderEpoch copied to the remote tier storage.
     */
  Leader epoch privateof longthe createdTimestamp;broker.

     /**
 @param createdTimestamp   * Size of the segment inEpoch bytes.
time at which the remote */
log segment is copied privateto long segmentSizeInBytes;

    /**the remote tier storage.
     * It@param indicatesmarkedForDeletion that this is marked for deletion.
 The respective segment of */
remoteLogSegmentId is marked fro privatedeletion.
 boolean markedForDeletion;

    /**
 @param    *remoteLogSegmentContext Any context returned by {@link RemoteStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)} for
     * the@param givensegmentSizeInBytes remoteLogSegmentId
     */
size of this segment privatein final byte[] remoteLogSegmentContext;


bytes.
      */**
     * @parampublic RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, long startOffset, long endOffset,
  Universally unique remote log segment id.
     * @param startOffset             Start offset of this segment.
     *long @parammaxTimestamp, endOffsetint leaderEpoch, long createdTimestamp,
            End offset of this segment.
     * @param maxTimestamp            maximum timestampboolean in this segment
     * @param leaderEpochmarkedForDeletion, byte[] remoteLogSegmentContext, long segmentSizeInBytes) {
        this.remoteLogSegmentId = remoteLogSegmentId;
       Leader epochthis.startOffset of= the broker.
startOffset;
      * @param createdTimestampthis.endOffset = endOffset;
      Epoch time atthis.leaderEpoch which= theleaderEpoch;
 remote log segment is copied to the remotethis.maxTimestamp tier= storage.maxTimestamp;
     * @param markedForDeletion this.createdTimestamp = createdTimestamp;
    The respective segment of remoteLogSegmentIdthis.markedForDeletion is= markedmarkedForDeletion;
 fro deletion.
     * @param this.remoteLogSegmentContext Any context returned by {@link RemoteStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)}
= remoteLogSegmentContext;
       * @param this.segmentSizeInBytes = segmentSizeInBytes;
    size of this segment in bytes.
     */
    public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, long startOffset, long endOffset,
                                    long maxTimestamp, int leaderEpoch, long createdTimestamp,
                                    boolean markedForDeletion, byte[] remoteLogSegmentContext, long segmentSizeInBytes) {
        this.remoteLogSegmentId = remoteLogSegmentId;
        this.startOffset = startOffset;
        this.endOffset = endOffset;
        this.leaderEpoch = leaderEpoch;
        this.maxTimestamp = maxTimestamp;
        this.createdTimestamp = createdTimestamp;
        this.markedForDeletion = markedForDeletion;
        this.remoteLogSegmentContext = remoteLogSegmentContext;
        this.segmentSizeInBytes = segmentSizeInBytes;
    }

...
}

}

...
}

New metrics

The following new metrics will be added:

mbeandescription
kafka.log.remote:type=RemoteLogReaderMetrics, name=RequestsPerSec, topic=([-.w]+)Number of remote storage read requests per second.
kafka.log.remote:type=RemoteLogReaderMetrics, name=BytesPerSec, topic=([-.w]+)Number of bytes read from remote storage per second.
kafka.log.remote:type=RemoteLogReaderMetrics, name=ErrorsPerSecNumber of remote storage read errors per second.
kafka.log.remote:type=RemoteStorageThreadPool, name=RemoteLogReaderTaskQueueSizeNumber of remote storage read tasks pending for execution.
kafka.log.remote:type=RemoteStorageThreadPool, name=RemoteLogReaderAvgIdlePercentAverage idle percent of the remote storage reader thread pool.
kafka.log.remote:type=RemoteLogManagerMetrics, name=RemoteLogManagerTasksAvgIdlePercentAverage idle percent of RemoteLogManager thread pool.

kafka.log.remote:type=RemoteLogManagerMetrics, name=CopyToRemoteStorageBytesPerSec

Number of bytes copied to remote storage per second.
kafka.log.remote:type=RemoteLogManagerMetrics, name=CopyToRemoteStorageErrorsPerSec, topic=([-.w]+)Number of remote storage write errors per second.
kafka.log.remote:type=RemoteLogManagerMetrics, name=CopyToRemoteStorageBacklogBytesThe total number of bytes of the segments that are pending to be copied to remote storage.


Performance Test Results

We have tested the performance of the initial implementation of this proposal.

...