Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleRemoteLogMetadataManager
/**
 * This interface provides storing and fetching remote log segment metadata with strongly consistent semantics.
 *
 */
@InterfaceStability.Unstable
public interface RemoteLogMetadataManager extends Configurable, Closeable {

    /**
     * Stores RemoteLogSegmentMetadata for the given RemoteLogSegmentMetadata.
     *
     * @param remoteLogSegmentId
     * @param remoteLogSegmentMetadata
     * @throws IOException
     */
    void putRemoteLogSegmentData(RemoteLogSegmentId remoteLogSegmentId, RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException;

    /**
     * Fetches RemoteLogSegmentId for the given topic partition which contains the given offset.
     *
     * @param topicPartition
     * @param offset
     * @return
     * @throws IOException
     */
    RemoteLogSegmentId getRemoteLogSegmentId(TopicPartition topicPartition, long offset) throws IOException;

    /**
     * Fetches RemoteLogSegmentMetadata for the given RemoteLogSegmentId.
     *
     * @param metadata
     * @return
     * @throws IOException
     */
    RemoteLogSegmentMetadata getRemoteLogSegmentMetadata(RemoteLogSegmentId metadata) throws IOException;

    /**
     * Earliest log offset if exists for the given topic partition in the remote storage. Return {@link Optional#empty()}
     * if there are no segments in the remote storage.
     *
     * @param tp
     * @return
     */
    Optional<Long> earliestLogOffset(TopicPartition tp) throws IOException;

    /**
     * List the remote log segment files of the given topicPartition.
     * The RemoteLogManager of a follower uses this method to find out the remote data for the given topic partition.
     *
     * @return List of remote segments, sorted by baseOffset in ascending order.
     */
    default List<RemoteLogSegmentMetadata> listRemoteSegments(TopicPartition topicPartition) {
        return listRemoteSegments(topicPartition, 0);
    }

    /**
     *
     * @param topicPartition
     * @param minOffset
     * @return
     */
    List<RemoteLogSegmentMetadata> listRemoteSegments(TopicPartition topicPartition, long minOffset);

}

/**
 * Metadata about the log segment stored in remote tier storage.
 */
public class RemoteLogSegmentMetadata {

    private RemoteLogSegmentId remoteLogSegmentId;
    private long startOffset;
    private long endOffset;
    private int leaderEpoch;

    /**
     * whether the remote segment is created or not.
     */
    private boolean created;

    private byte[] remoteLogSegmentContext;
}


Proposed Changes

High-level design

...

The earlier approach consists of pulling the remote log segment metadata from remote log storage APIs as mentioned in the earlier RemoteStorageManager_Old section. This approach worked fine for storages like HDFS. One of the problems of relying on the remote storage to maintain metadata is that tiered-storage needs to be strongly consistent, with an impact not only on the metadata itself (e.g. LIST in S3) but also on the segment data (e.g. GET after a DELETE in S3). Additionally to consistency and availability, the cost (and to a lesser extent performance) of maintaining metadata in remote storage needs to be factored in. This is true in the In case of S3, frequent LIST APIs incur huge costs. 

So, remote storage is uncoupled separated from the remote log metadata store and introduced RemoteStorageManager and RemoteLogMetadataManager respectivelyYou can see the discussion details in the doc located here.

...

RLM creates tasks for each leader or follower topic partition:which are explained in detail here.

  • RLM Leader Task
    • It checks for rolled over LogSegments (which have the last message offset less than last stable offset of that topic partition) and copies them along with their offset/time and other indexes to the remote tier. It also serves the fetch requests for older data from the remote tier. Local logs are not cleaned up till those segments are copied successfully to remote even though their retention time/size is reached.

[We proposed an approach to creating a RemoteLogSegmentIndex, per topic-partition to track remote LogSegments. These indexes are described in more detail here. This allows having a larger index interval of remote log segments instead of a large number of small index files. It also supports encrypted segments by encrypting individual record batch and build the respective indexes. We may want to explore this approach by enhancing RemoteStorageManager in later versions ]

  • RLM Follower Task 
    • It keeps track of the segments and index files on remote tier by looking into RemoteLogMetdataManager. RLM follower can also serve reading old data from the remote tier.

...

If RLM is configured, ReplicaManager will call RLM to assign topic-partitions or remove topic-partitions.

If the broker changes its state from Leader to Follower for a topic-partition and RLM is in the process of copying the segment, it will finish the copy before it relinquishes the copy for topic-partition. This might leave duplicated messagessegments but these will be cleanedup when these segments are ready for deletion based on remote retention configs.

Consumer Fetch Requests

For any fetch requests, ReplicaManager will proceed with making a call to readFromLocalLog, if this method returns OffsetOutOfRange exception it will delegate the read call to RemoteLogManager. More details are explained in the RLM/RSM tasks section.

...

For follower fetch, the leader only returns the data that is still in the leader's local storage. LogSegments that exist only on remote storage are not replicated to followers as it is already present in remote storage. Instead, a follower will retrieve the information of the segment from remote storagesegment from RemoteMetadataManager. If a Replica becomes a leader, It can still locate and serve data from remote storage.

...

RLM/RSM tasks and thread pools
Anchor
rlm-rsm-tasks
rlm-rsm-tasks

Remote storage (e.g. HDFS/S3/ HDFSGCP) is likely to have higher I/O latency and lower availability than local storage.

...

       - the offset range is not covered completely by the segments on the remote storage and

...

If multiple log segment files are ready, they are copied to remote storage one by one, from the earliest to the latest. It uses the below copy API from RSM. It generates a universally unique RemoteLogSegmentId for a segment, it calls RLMM.putRemoteLogSegmentData(RemoteLogSegmentId remoteLogSegmentId, RemoteLogSegmentMetadata remoteLogSegmentMetadata) and invokes copyLogSegment(RemoteLogSegmentId remoteLogSegmentId, LogSegmentData logSegmentData) on RSMIf it is successful then it calls RLMM.putRemoteLogSegmentData(RemoteLogSegmentId remoteLogSegmentId, RemoteLogSegmentMetadata remoteLogSegmentMetadata) to store metadata. 

Handle expired remote segments (leader and follower)

with the updated RemoteLogSegmentMetadata instance else it removes the entry. Any dangling entries will be removed while removing expired log segments based on remote retention.  

Handle expired remote segments (leader and follower)

RLM leader computes the log segments to be deleted based on the remote retention config. It updates the earliest offset for the given topic partition in RLMM. It gets all the remote log segment ids and removes them from remote storage using RemoteStorageManager. It also removes respective metadata using RemoteLogMetadataManager. If there are any failures in removing remote log segments then those are stored in a specific topic (default as __remote_segments_to_be_deleted) and user can consume the events(which contain remote-log-segment-id) from that topic and clean them up from remote storage.  This can be improved upon in later versions. RLM leader computes the log segments to be deleted and it removes the remote log segment and its metadata.

RLM follower fetches the earliest offset by calling RLMM.earliestLogOffset(tp: TopicPartition).Both leader and follower cleansup cleanup the existing indexes till that offset and updates start offset with the received value.

...

For follower replicas, it maintains metadata cache by subscribing to the respective remote log metadata topic partitions. Whenever a topic partition is reassigned to a new broker and RLMM on that broker is not subscribed to the respective remote log metadata topic partition then it will subscribe to the respective remote log metadata topic partition and adds all the entries to the cache. So, in the worst case, RLMM on a broker may be consuming from most of the remote log metadata topic partitions. This requires the cache to be based on disk storage like RocksDB to avoid a high memory footprint on a broker. This will allow us to commit offsets of the partitions that are already read. Committed offsets can be stored in a local file to avoid reading the messages again when a broker is restarted.

[We may add more details later about how the resultant state for each topic partition is computed ]

Remote Log Indexes
Anchor
rdi-format
rdi-format

...