Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

System-Wide

remote.log.storage.enable Whether to enable remote log storage or not. Valid values are `true` or `false` and the default value is false. This property gives backward compatibility.

remote.log.storage.manager.class.name =  org.apache.kafka.rsm.hdfs.HDFSRemoteStorageManager- This is mandatory if the remote.log.storage.enable is set as true.

remote.log.metadata.manager.class.name(optional) - This is an optional property. If this is not configured, Kafka uses an inbuilt topic bases metadata manager.Not configuring the above property gives backward comaptibility. 

RemoteStorageManager

(These configs are dependent on remote storage manager implementation)

remote.log.storage.*

RemoteLogMetadataManager

(These configs are dependent on remote log metadata manager implementation)

remote.log.metadata.*

Thread poolsThread pools

remote.log.manager.thread.pool.size
Remote log thread pool size, which is used in scheduling tasks to copy segments, fetch remote log indexes and clean up remote log segments.

remote.log.manager.task.interval.ms
Interval The interval at which remote log manager runs the scheduled tasks like copy segments, fetch remote log indexes and clean up remote log segments.

remote.log.reader.threads
Remote log reader thread pool size

remote.log.reader.max.pending.tasks
Maximum remote log reader thread pool task queue size. If the task queue is full, broker will stop reading remote log segments.

Per Topic Configuration

remote.log.retention.minutes

remote.log.retention.bytes

...

         `RemoteStorageManager` is an interface to provide the lifecycle of remote log segments. We will provide a simple implementation of RSM to get a better understanding of the APIs. HDFS and S3 implementation are planned to be hosted in external repos and these will not be part of Apache Kafka repo. This is inline with the approach taken for Kafka connectors.


Code Block
languagejava
collapsetruescala

/**
 * RemoteStorageManager isprovides anthe interfacelifecycle that allows to plugin different of remote storagelog implementationssegments towhich provide
includes *copy, thefetch, lifecycleand of remote log segmentsdelete operations.
 *
 * All{@link theseRemoteLogMetadataManager} APIs are still experimental.
 */
@InterfaceStability.Unstable
trait RemoteStorageManager extends Configurable with AutoCloseable {

  /**
   * Earliest log offset if exists for the given topic partition in the remote storage. Return -1 if there are no
   * segments in the remote storage.
   *
   * @param tp
   * @return
   */
  @throws(classOf[IOException])
  def earliestLogOffset(tp: TopicPartition): Long

  /**
   * Copies LogSegment provided by [[RemoteLogManager]] for the given topic partition with the given leader epoch.
   * Returns the RDIs of the remote data. This method is invoked by the leader of topic partition.
   *
   * //todo LogSegment is not public, this will be changed with an interface which provides base and end offset of the
   * segment, log and offset/time indexes.
   *
   * @param topicPartition
   * @param logSegment
   * @return
   */
  @throws(classOf[IOException])
  def copyLogSegment(topicPartition: TopicPartition, logSegment: LogSegment, leaderEpoch: Int): util.List[RemoteLogIndexEntry]

  /**
   * List the remote log segment files of the given topicPartition.
   * The RemoteLogManager of a follower uses this method to find out the remote data for the given topic partition.
   *
   * @return List of remote segments, sorted by baseOffset in ascending order.
   */
  @throws(classOf[IOException])
  def listRemoteSegments(topicPartition: TopicPartition): util.List[RemoteLogSegmentInfo] = {
    listRemoteSegments(topicPartition, 0)
  }

is responsible storing and fetching metadata about the remote log segments in a
 * strongly consistent manner.
 *
 * Each upload or copy of a segment is given with a {@link RemoteLogSegmentId} which is universally unique even for the
 * same topic partition and offsets. Once the copy or upload is successful, {@link RemoteLogSegmentMetadata} is
 * created with RemoteLogSegmentId and other log segment information and it is stored in {@link RemoteLogMetadataManager}.
 * This allows RemoteStorageManager to store segments even in eventually consistent manner as the metadata is already
 * stored in a consistent store.
 *
 * All these APIs are still experimental.
 */
@InterfaceStability.Unstable
public interface RemoteStorageManager extends Configurable, Closeable {

    /**
     * Copies LogSegmentData provided for the given RemoteLogSegmentId and returns any contextual
     * information about this copy operation. This can include path to the object in the store etc.
     *
     * Invoker of this API should always send a unique id as part of {@link RemoteLogSegmentId#id()} even when it
     * retries to invoke this method for the same log segment data.
     *
     * @param remoteLogSegmentId
     * @param logSegmentData
     * @return
     * @throws IOException
     */
    RemoteLogSegmentContext copyLogSegment(RemoteLogSegmentId remoteLogSegmentId, LogSegmentData logSegmentData) throws IOException;

    /**
     * ListReturns the remote log segment filesdata offile/object theas specifiedInputStream topicPartitionfor startingthe fromgiven theRemoteLogSegmentMetadata basestarting
 offset minBaseOffset.
   * Thefrom RLMthe ofgiven astartPosition. followerIf usesendPosition this method to find out the remote data
   *is given then the stream will end at that position else it will be
     * @paramgiven minBaseOffsettill Thethe minimumend baseof offsetthe forremote alog segment to be returneddata file/object.
     *
 @return List of remote segments* starting@param fromremoteLogSegmentId
 the base offset minBaseOffset, sorted* by@param baseOffsetstartPosition
 in ascending order.
  * */
  @throws(classOf[IOException])
  def listRemoteSegments(topicPartition: TopicPartition, minBaseOffset: Long): util.List[RemoteLogSegmentInfo]

  @param endPosition
     * @return
     * @throws IOException
     */
    InputStream fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentId, Long startPosition, Optional<Long> endPosition) throws IOException;

    /**
     * ReturnsDeletes athe Listremote oflog RemoteLogIndexEntrysegment for the given RemoteLogSegmentInforemoteLogSegmentId. ThisReturns is used by follower to store remotetrue if the deletion is successful.
     * Broker pushes an event to __delete_failed_remote_log_segments topic for failed segment deletions so that users
     * can do logthe indexescleanup locallylater.
     *
     * @param remoteLogSegmentremoteLogSegmentId
     * @return
   */  * @throws IOException
  @throws(classOf[IOException])   */
  def  boolean getRemoteLogIndexEntriesdeleteLogSegment(remoteLogSegment:RemoteLogSegmentId RemoteLogSegmentInforemoteLogSegmentId): util.List[RemoteLogIndexEntry]

   throws IOException;
}


/**
   * DeletesThis remoterepresents LogSegmenta fileuniversally andunique indexesid forassociated theto givena remoteLogSegmentInfo
topic partition's log *
segment. This will *be @paramregenerated remoteLogSegmentInfofor
   * @return
every attempt  */
  @throws(classOf[IOException])
  def deleteLogSegment(remoteLogSegmentInfo: RemoteLogSegmentInfo): Boolean

  /**
   * Delete all the log segments for the given topic partition. This can be done by rename the existing locations
   * and delete them later in asynchronous manner.
   *
   * @param topicPartition
   * @return
   */
  @throws(classOf[IOException])
  def deleteTopicPartition(topicPartition: TopicPartition): Boolean

  /**
   * Remove the log segments which are older than the given cleanUpTillMs. Return the log start offset of the
   * earliest remote log segment if exists or -1 if there are no log segments in the remote storage.
   *
   * @param topicPartition
   * @param cleanUpTillMs
   * @return
   */
  @throws(classOf[IOException])
  def cleanupLogUntil(topicPartition: TopicPartition, cleanUpTillMs: Long): Long

  /**
   * Read up to maxBytes data from remote storage, starting from the 1st batch that is greater than or equals to the
   * startOffset. It will read at least one batch, if the 1st batch size is larger than maxBytes.
   *
   * @param remoteLogIndexEntry The first remoteLogIndexEntry that remoteLogIndexEntry.lastOffset >= startOffset
   * @param maxBytes            maximum bytes to fetch for the given entry
   * @param startOffset         initial offset to be read from the given rdi in remoteLogIndexEntry
   * @param minOneMessage       if true read at least one record even if the size is more than maxBytes
   * @return
   */
  @throws(classOf[IOException])
  def read(remoteLogIndexEntry: RemoteLogIndexEntry, maxBytes: Int, startOffset: Long, minOneMessage: Boolean): Records

of copying a specific log segment in {@link RemoteLogStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)}.
 */
public class RemoteLogSegmentId {
    private TopicPartition topicPartition;
    private UUID id;

    public RemoteLogSegmentId(TopicPartition topicPartition, UUID id) {
        this.topicPartition = requireNonNull(topicPartition);
        this.id = requireNonNull(id);
    }

    public TopicPartition topicPartition() {
        return topicPartition;
    }

    public UUID id() {
        return id;
    }
...
}


public class LogSegmentData {

    private FileRecords logSegment;
    private File offsetIndex;
    private File timeIndex;
    //todo add other required indexes like txnIndex
...
}


RemoteLogMetadataManager

`RemoteLogMetadataManager` is an interface to provide the lifecycle of metadata about remote log segments with strongly consistent semantics. There is a default implementation that uses an internal topic. Users can plugin their own implementation if they intend to use another system to store remote log segment metadata.


Code Block
languagejava
titleRemoteLogMetadataManager
collapsetrue
/**
 * This interface provides storing and fetching remote log segment metadata with strongly consistent semantics.
 *
 */
@InterfaceStability.Unstable
public interface RemoteLogMetadataManager extends Configurable, Closeable {

    /**
     * Stores RemoteLogSegmentMetadata for the given RemoteLogSegmentMetadata.
     *
     * @param remoteLogSegmentId
     * @param remoteLogSegmentMetadata
     * @throws IOException
     */
    void putRemoteLogSegmentData(RemoteLogSegmentId remoteLogSegmentId, RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException;

    /**
     * Fetches RemoteLogSegmentId for the given topic partition which contains the given offset.
     *
     * @param topicPartition
     * @param offset
     * @return
     * @throws IOException
     */
    RemoteLogSegmentId getRemoteLogSegmentId(TopicPartition topicPartition, long offset) throws IOException;

    /**
   * Release any* systemFetches resourcesRemoteLogSegmentMetadata usedfor bythe thisgiven instance.RemoteLogSegmentId.
     *
     * @param metadata
     * @return
     * @throws IOException
     */
  def  RemoteLogSegmentMetadata closegetRemoteLogSegmentMetadata(RemoteLogSegmentId metadata): throws UnitIOException;
}


Proposed Changes

High-level design

Image Removed

...

    /**
     * Earliest log offset if exists for the given topic partition in the remote storage. Return {@link Optional#empty()}
     * if there are no segments in the remote storage.
     *
     * @param tp
     * @return
     */
    Optional<Long> earliestLogOffset(TopicPartition tp) throws IOException;

    /**
     * List the remote log segment files of the given topicPartition.
     * The RemoteLogManager of a follower uses this method to find out the remote data for the given topic partition.
     *
     * @return List of remote segments, sorted by baseOffset in ascending order.
     */
    default List<RemoteLogSegmentInfo> listRemoteSegments(TopicPartition topicPartition) {
        return listRemoteSegments(topicPartition, 0);
    }

    /**
     *
     * @param topicPartition
     * @param minOffset
     * @return
     */
    List<RemoteLogSegmentInfo> listRemoteSegments(TopicPartition topicPartition, long minOffset);

}


Proposed Changes

High-level design

Image Added

RemoteLogManager (RLM) is a new component which

  • receives callback events for leadership changes and stop/delete events of topic partitions on a broker.
  • delegates copy and read of these segments and deleting topic partitions to a pluggable storage manager(viz RemoteStorageManager) implementation.


RLM creates tasks for each leader or follower topic partition:

  • RLM Leader Task - It checks for rolled over LogSegments (which have last message offset less than last stable offset of that topic partition) and copies them along with their remote offset/time indexes to the remote tier. RLM creates an index file, called RemoteLogSegmentIndex, per topic-partition to track remote LogSegments. These indexes are described in detail here. It also serves the fetch requests for older data from the remote tier. Local logs are not cleanedup till those segments are copied successfully to remote even though their retention time/size is reached.
  • RLM Follower Task - It keeps track of the segments and index files on remote tier by looking into RemoteLogMetdataManager. RLM follower can also serve reading old data from the remote tier.

Core Kafka changes

To satisfy the goal of keeping Kafka changes minimal when RLM is not configured, Kafka behavior remains unchanged for existing users.

  • Core Kafka starts RLM service if tiered storage is configured
  • When an offset index is not found, if RLM is configured, the read request is delegated to RLM to serve the data from the remote tier.

Local and Remote log offset constraints

Below are the leader topic partition's log offsets

Image Added

Lx  = Local log start offset           Lz  = Local log end offset            Ly  = Last stable offset(LSO)

Ry  = Remote log end offset       Rx  = Remote log start offset


Lz >= Ly and Ly >= Lx and Ly >= Rand Ry >= Rx

RemoteLogMetadataManager implemented with an internal topic

Remote log metadata of partition’s remote log segments is stored in an internal topic called `__remote_log_metadata`. This topic can be created with default partitions count as 50. 

In this design, RemoteLogMetadataManager(RLMM) is responsible for storing and fetching remote log metadata. It provides

  • Storing remote log metadata for a partition based on offsets
  • Fetching remote log segment metadata for an offset
  • Register a topic partition to build cache for remote log metadata by reading from remote log metadata topic

RemoteLogMetadataManager(RLMM) mainly has the below components

  • Cache
  • Producer
  • Consumer

Remote log metadata topic partition for a given user topic-partition is:

user-topic-partition.toString().hashCode() % no_of_remote_log_metadata_topic_partitions


For leader partition replicas, RemoteLogManager(RLM) copies the log segment and indexes to the remote storage with the given UUID (RemoteStorageManager#copyLogSegment API). RLM calls RLMM to store remote log metadata. This is stored in the remote log metadata topic and updates the cache. 

For follower partition replicas, RLM fetches the remote log segment information for a given offset from RLMM. It fetches remote log index entries by using RemoteStorageManager.

RLMM registers the topic partitions that the broker is either a leader or a follower. 

For leader topic partitions, it follows the process as mentioned in the earlier section. 

For follower partitions, it maintains metadata cache by subscribing to the respective remote log metadata topic partitions. Whenever a topic partition is reassigned to a new broker and RLMM on that broker is not subscribed to the respective remote log metadata topic partition then it will subscribe to the respective remote log metadata topic partition and adds all the entries to the cache. So, in the worst case, RLMM on a broker may be consuming from most of the remote log metadata topic partitions. This requires the cache to be based on disk storage like RocksDB to avoid a high memory footprint on a broker. This will allow us to commit offsets of the partitions that are already read. Committed offsets can be stored in a local file to avoid reading the messages again when a broker is restarted.

  • receives callback events for leadership changes and stop/delete events of topic partitions on a broker.
  • delegates copy and read of these segments and deleting topic partitions to a pluggable storage manager(viz RemoteStorageManager) implementation.

RLM creates tasks for each leader or follower topic partition:

  • RLM Leader Task - It checks for rolled over LogSegments (which have last message offset less than last stable offset of that topic partition) and copies them along with their remote offset/time indexes to the remote tier. RLM creates an index file, called RemoteLogSegmentIndex, per topic-partition to track remote LogSegments. These indexes are described in detail here. It also serves the fetch requests for older data from the remote tier. Local logs are not cleanedup till those segments are copied successfully to remote even though their retention time/size is reached.
  • RLM Follower Task - it keeps track of the segments and index files on remote tier and updates its RemoteLogSegmentIndex file per topic-partition. Local logs are not cleanedup till their remote log indexes are copied locally from remote storage even though their retention time/size is reached. RLM follower can also serve reading old data from the remote tier.

Core Kafka changes

To satisfy the goal of keeping Kafka changes minimal when RLM is not configured, Kafka behavior remains unchanged for existing users.

  • Core Kafka starts RLM service if tiered storage is configured
  • When an offset index is not found, if RLM is configured, the read request is delegated to RLM to serve the data from the remote tier.

Local and Remote log offset constraints

Below are the leader topic partition's log offsets

Image Removed

Lx  = Local log start offset           Lz  = Local log end offset            Ly  = Last stable offset(LSO)

Ry  = Remote log end offset       Rx  = Remote log start offset

Lz >= Ly and Ly >= Lx and Ly >= Rand Ry >= Rx

Remote Log Indexes
Anchor
rdi-format
rdi-format

...

If RLM is configured, ReplicaManager will call RLM to assign topic-partitions or remove topic-partitions similar to how the replicaFetcherManager ReplicaFetcherManager works today.

If the broker changes its state from Leader to Follower for a topic-partition and RLM is in the process of copying the segment, it will finish the copy before it relinquishes the copy for topic-partition. This might leave duplicated messages.

ReplicaManager.readLocalLog works as it does today. But only in case of OffsetOutOfRange of exception and RLM is configured we will delegate the read request to RLM which returns LogReadResult.

Code Block
languagescala
def readFromLocaLog(): Seq[(TopicPartition, LogReadResult)] = {
catch {
case e@ (_: OffsetOutOfRangeException) =>
    RemoteLogManager.read(fetchMaxBytes: Int,
                          hardMaxBytesLimit: Boolean, 
                          tp: TopicPartition, 
                          fetchInfo: PartitionData 
                          quota: ReplicaQuota)
}

...