Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For follower replicas, it maintains metadata cache by subscribing to the respective remote log metadata topic partitions. Whenever a topic partition is reassigned to a new broker and RLMM on that broker is not subscribed to the respective remote log metadata topic partition then it will subscribe to the respective remote log metadata topic partition and adds all the entries to the cache. So, in the worst case, RLMM on a broker may be consuming from most of the remote log metadata topic partitions. This requires the cache to be based on disk storage like RocksDB to avoid a high memory footprint on a broker. This will allow us to commit offsets of the partitions that are already read. Committed offsets can be stored in a local file to avoid reading the messages again when a broker is restarted.

Configs

remote.log.metadata.topic.replication.factor


Replication factor of the topic

Default: 3

remote.log.metadata.topic.partitions

No of partitions of the topic

Default: 50

remote.log.metadata.topic.retention.ms

Retention of the topic in milli seconds

Default: 365 * 24 * 60 * 60 * 1000  (1 yr)

remote.log.metadata.manager.listener.name

Listener name to be  used to connect to the local broker by RemoteLogMetadataManager implementation on the broker. Respective endpoint address is passed with  "bootstrap.servers" property while invoking RemoteLogMetadataManager#configure(Map<String, ?> props). 

This is used by kafka clients created in RemoteLogMetadataManager implementation.

remote.log.metadata.*

Any other properties should be prefixed with "remote.log.metadata." and these will be passed to

RemoteLogMetadataManager implementation

RemoteLogMetadataManager#configure(Map<String, ?> props).

For ex: Security configuration to connect to the local broker for the listener name configured are passed with props.

[We will add more details later about how the resultant state for each topic partition is computed ]

...

Code Block
languagejava
titleRemoteLogMetadataManager
/**
 * This interface provides storing and fetching remote log segment metadata with strongly consistent semantics.
 * <p>
 * When {@link #configure(Map)} is invoked on this instance, {@link #BROKER_ID}, {@link #CLUSTER_ID} properties are
 * passed which can be used by this instance if needed. These propertiess can be used if there is a single storage used
 * for different clusters. For ex: MySQL storage can be used as metadata store for all the clusters across the org.
 * <p>
 * 
 * todo-tier cleanup the abstractions in this interface.
 */
@InterfaceStability.Unstable
public interface RemoteLogMetadataManager extends Configurable, Closeable {

    /**
     * Property name for broker id.
     */
    String BROKER_ID = "broker.id";

    /**
     * Property name for cluster id.
     */
    String CLUSTER_ID = "cluster.id";

    /**
     * Stores RemoteLogSegmentMetadata with the containing RemoteLogSegmentId into RemoteLogMetadataManager.
     *
     * RemoteLogSegmentMetadata is identified by RemoteLogSegmentId.
     *
     * @param remoteLogSegmentMetadata
     * @throws RemoteStorageException
     */
    void putRemoteLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException;

    /**
     * Fetches RemoteLogSegmentId for the given topic partition which contains the given offset.
     *
     * This will evolve to refactor TopicPartition to TopicPartitionId which contains a unique identifier and TopicPartition.
     *
     * @param topicPartition
     * @param offset
     * @return
     * @throws RemoteStorageException
     */
    RemoteLogSegmentId remoteLogSegmentId(TopicPartition topicPartition, long offset) throws RemoteStorageException;

    /**
     * Fetches RemoteLogSegmentMetadata for the given topic partition and offset.
     *
     * This will evolve to refactor TopicPartition to TopicPartitionId which contains a unique identifier and TopicPartition.
     *
     * @param topicPartition
     * @param offset
     * @return
     * @throws RemoteStorageException
     */
    RemoteLogSegmentMetadata remoteLogSegmentMetadata(TopicPartition topicPartition, long offset) throws RemoteStorageException;

    /**
     * Returns earliest log offset if there are segments in the remote storage for the given topic partition, else
     * returns {@link Optional#empty()}.
     *
     * This is treated as the effective log-start-offset of the topic partition's log.
     *
     * todo check whether we need to pass leader-epoch.
     *
     * @param tp
     * @return
     */
    Optional<Long> earliestLogOffset(TopicPartition tp) throws RemoteStorageException;

    /**
     * Returns highest log offset of topic partition in remote storage.
     *
     * @param tp
     * @return
     * @throws RemoteStorageException
     */
    Optional<Long> highestLogOffset(TopicPartition tp) throws RemoteStorageException;

    /**
     * Deletes the log segment metadata for the given remoteLogSegmentId.
     *
     * @param remoteLogSegmentId
     * @throws RemoteStorageException
     */
    void deleteRemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId) throws RemoteStorageException;

    /**
     * List the remote log segment files of the given topicPartition.
     * The RemoteLogManager of a follower uses this method to find out the remote data for the given topic partition.
     * <p>
     * This is used in while deleting a given topic partition to fetch all the remote log segments for the given  topic
     * partition and set a tombstone marker for them to be deleted.
     *
     * @return List of remote segments, sorted by baseOffset in ascending order.
     */
    default List<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicPartition topicPartition) {
        return listRemoteLogSegments(topicPartition, 0);
    }

    /**
     * Returns list of remote segments, sorted by {@link RemoteLogSegmentMetadata#startOffset()} in ascending order
     * which are >= the given min Offset.
     *
     * @param topicPartition
     * @param minOffset
     * @return List of remote segments, sorted by baseOffset in ascending order.
     */
    List<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicPartition topicPartition, long minOffset);

    /**
     * This method is invoked only when there are changes in leadership of the topic partitions that this broker is
     * responsible for.
     *
     * @param leaderPartitions   partitions that have become leaders on this broker.
     * @param followerPartitions partitions that have become followers on this broker.
     */
    void onPartitionLeadershipChanges(Set<TopicPartition> leaderPartitions, Set<TopicPartition> followerPartitions);

    /**
     * This method is invoked only when the given topic partitions are stopped on this broker. This can happen when a
     * partition is emigrated to other broker or a partition is deleted.
     *
     * @param partitions
     */
    void onStopPartitions(Set<TopicPartition> partitions);

    /**
     * Callback to receive once server is started so that this class can run tasks which should be run only when the
     * server is started.
     */
    void onServerStarted(final String serverEndpoint);
}



/**
 * Metadata about the log segment stored in remote tier storage.
 */
public class RemoteLogSegmentMetadata implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * Universally unique remote log segment id.
     */
    private final RemoteLogSegmentId remoteLogSegmentId;

    /**
     * Start offset of this segment.
     */
    private final long startOffset;

    /**
     * End offset of this segment.
     */
    private final long endOffset;

    /**
     * Leader epoch of the broker.
     */
    private final int leaderEpoch;

    /**
     * Maximum timestamp in the segment
     */
    private final long maxTimestamp;

    /**
     * Epoch time at which the remote log segment is copied to the remote tier storage.
     */
    private final long createdTimestamp;

    /**
     * Size of the segment in bytes.
     */
    private final long segmentSizeInBytes;

    /**
     * It indicates that this is marked for deletion.
     */
    private final boolean markedForDeletion;

    /**
     * @param remoteLogSegmentId      Universally unique remote log segment id.
     * @param startOffset             Start offset of this segment.
     * @param endOffset               End offset of this segment.
     * @param maxTimestamp            maximum timestamp in this segment
     * @param leaderEpoch             Leader epoch of the broker.
     * @param createdTimestamp        Epoch time at which the remote log segment is copied to the remote tier storage.
     * @param markedForDeletion       The respective segment of remoteLogSegmentId is marked fro deletion.
     * @param remoteLogSegmentContext Any context returned by {@link RemoteStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)}
     * @param segmentSizeInBytes      size of this segment in bytes.
     */
    public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId, long startOffset, long endOffset,
                                    long maxTimestamp, int leaderEpoch, long createdTimestamp,
                                    boolean markedForDeletion, byte[] remoteLogSegmentContext, long segmentSizeInBytes) {
        this.remoteLogSegmentId = remoteLogSegmentId;
        this.startOffset = startOffset;
        this.endOffset = endOffset;
        this.leaderEpoch = leaderEpoch;
        this.maxTimestamp = maxTimestamp;
        this.createdTimestamp = createdTimestamp;
        this.markedForDeletion = markedForDeletion;
        this.remoteLogSegmentContext = remoteLogSegmentContext;
        this.segmentSizeInBytes = segmentSizeInBytes;
    }

...
}

...