Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This solution allows scaling storage independent of memory and CPUs in a Kafka cluster enabling Kafka to be a long-term storage solution. This also reduces the amount of data stored locally on Kafka brokers and hence the amount of data that needs to be copied during recovery and rebalancing. Log segments that are available in the remote tier need not be restored on the broker or restored lazily and are served from the remote tier. With this, increasing the retention period no longer requires scaling the Kafka cluster storage and the addition of new nodes. At the same time, the overall data retention can still be much longer eliminating the need for separate data pipelines to copy the data from Kafka to external stores, as done currently in many deployments.

...

Goals

Extend Kafka's storage beyond the local storage available on the Kafka cluster by retaining the older data in an external store, such as HDFS or S3 with minimal impact on the internals of Kafka. Kafka behavior and operational complexity must not change for existing users that do not have tiered storage feature configured.

...

Code Block
languagejava
titleRemoteStorageManager
/**
 * RemoteStorageManager provides the lifecycle of remote log segments which includes copy, fetch, and delete operations.
 *
 * {@link RemoteLogMetadataManager} is responsible storing and fetching metadata about the remote log segments in a
 * strongly consistent manner.
 *
 * Each upload or copy of a segment is given with a {@link RemoteLogSegmentId} which is universally unique even for the
 * same topic partition and offsets. Once the copy or upload is successful, {@link RemoteLogSegmentMetadata} is
 * created with RemoteLogSegmentId and other log segment information and it is stored in {@link RemoteLogMetadataManager}.
 * This allows RemoteStorageManager to store segments even in eventually consistent manner as the metadata is already
 * stored in a consistent store.
 *
 * All these APIs are still experimental.
 */
@InterfaceStability.Unstable
public interface RemoteStorageManager extends Configurable, Closeable {

    /**
     * Copies LogSegmentData provided for the given RemoteLogSegmentId and returns any contextual
     * information about this copy operation. This can include path to the object in the store etc.
     *
     * Invoker of this API should always send a unique id as part of {@link RemoteLogSegmentId#id()} even when it
     * retries to invoke this method for the same log segment data.
     *
     * @param remoteLogSegmentId
     * @param logSegmentData
     * @return
     * @throws IOException
     */
    RemoteLogSegmentContext copyLogSegment(RemoteLogSegmentId remoteLogSegmentId, LogSegmentData logSegmentData) throws IOException;

    /**
     * Returns the remote log segment data file/object as InputStream for the given RemoteLogSegmentMetadata starting
     * from the given startPosition. If endPosition is given then the stream will end at that position else it will be
     * given till the end of the remote log segment data file/object.
     *
     * @param remoteLogSegmentId
     * @param startPosition
     * @param endPosition
     * @return
     * @throws IOException
     */
    InputStream fetchLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentId, Long startPosition, Optional<Long> endPosition) throws IOException;

    /**
     * Deletes the remote log segment for the given remoteLogSegmentId. Returns true if the deletion is successful.
     * Broker pushes an event to __delete_failed_remote_log_segments topic for failed segment deletions so that users
     * can do the cleanup later.
     *
     * @param remoteLogSegmentId
     * @return
     * @throws IOException
     */
    boolean deleteLogSegment(RemoteLogSegmentId remoteLogSegmentId) throws IOException;
}


/**
 * This represents a universally unique id associated to a topic partition's log segment. This will be regenerated for
 * every attempt of copying a specific log segment in {@link RemoteLogStorageManager#copyLogSegment(RemoteLogSegmentId, LogSegmentData)}.
 */
public class RemoteLogSegmentId {
    private TopicPartition topicPartition;
    private UUID id;

    public RemoteLogSegmentId(TopicPartition topicPartition, UUID id) {
        this.topicPartition = requireNonNull(topicPartition);
        this.id = requireNonNull(id);
    }

    public TopicPartition topicPartition() {
        return topicPartition;
    }

    public UUID id() {
        return id;
    }
...
}


public class LogSegmentData {

    private FileRecords logSegment;
    private File offsetIndex;
    private File timeIndex;
    //todo add other required indexes like txnIndex
...
}


RemoteStorageManager_Old


Code Block
languagejava
titleRemoteStorageManager_Old
/**
 * RemoteStorageManager is an interface that allows to plugin different remote storage implementations to copy and
 * retrieve the log segments.
 *
 * All these APIs are still experimental.
 */
@InterfaceStability.Unstable
trait RemoteStorageManager extends Configurable with AutoCloseable {

  /**
   * Earliest log offset if exists for the given topic partition in the remote storage. Return -1 if there are no
   * segments in the remote storage.
   *
   * @param tp
   * @return
   */
  @throws(classOf[IOException])
  def earliestLogOffset(tp: TopicPartition): Long

  /**
   * Copies LogSegment provided by [[RemoteLogManager]] for the given topic partition with the given leader epoch.
   * Returns the RDIs of the remote data. This method is invoked by the leader of topic partition.
   *
   * //todo LogSegment is not public, this will be changed with an interface which provides base and end offset of the
   * segment, log and offset/time indexes.
   *
   * @param topicPartition
   * @param logSegment
   * @return
   */
  @throws(classOf[IOException])
  def copyLogSegment(topicPartition: TopicPartition, logSegment: LogSegment,
                     leaderEpoch: Int): util.List[RemoteLogIndexEntry]

  /**
   * List the remote log segment files of the given topicPartition.
   * The RemoteLogManager of a follower uses this method to find out the remote data for the given topic partition.
   *
   * @return List of remote segments, sorted by baseOffset in ascending order.
   */
  @throws(classOf[IOException])
  def listRemoteSegments(topicPartition: TopicPartition): util.List[RemoteLogSegmentInfo] = {
    listRemoteSegments(topicPartition, 0)
  }

  /**
   * List the remote log segment files of the specified topicPartition with offset >= minOffset.
   * The RLM of a follower uses this method to find out the remote data
   *
   * @param minOffset The minimum offset for a segment to be returned.
   * @return List of remote segments that contains offsets >= minOffset, sorted by baseOffset in ascending order.
   */
  @throws(classOf[IOException])
  def listRemoteSegments(topicPartition: TopicPartition, minOffset: Long): util.List[RemoteLogSegmentInfo]

  /**
   * Returns a List of RemoteLogIndexEntry for the given RemoteLogSegmentInfo. This is used by follower to store remote
   * log indexes locally.
   *
   * @param remoteLogSegment
   * @return
   */
  @throws(classOf[IOException])
  def getRemoteLogIndexEntries(remoteLogSegment: RemoteLogSegmentInfo): util.List[RemoteLogIndexEntry]

  /**
   * Deletes remote LogSegment file and indexes for the given remoteLogSegmentInfo
   *
   * @param remoteLogSegmentInfo
   * @return
   */
  @throws(classOf[IOException])
  def deleteLogSegment(remoteLogSegmentInfo: RemoteLogSegmentInfo): Boolean

  /**
   * Delete all the log segments for the given topic partition. This can be done by rename the existing locations
   * and delete them later in asynchronous manner.
   *
   * @param topicPartition
   * @return
   */
  @throws(classOf[IOException])
  def deleteTopicPartition(topicPartition: TopicPartition): Boolean

  /**
   * Remove the log segments which are older than the given cleanUpTillMs. Return the log start offset of the
   * earliest remote log segment if exists or -1 if there are no log segments in the remote storage.
   *
   * @param topicPartition
   * @param cleanUpTillMs
   * @return
   */
  @throws(classOf[IOException])
  def cleanupLogUntil(topicPartition: TopicPartition, cleanUpTillMs: Long): Long

  /**
   * Read up to maxBytes data from remote storage, starting from the 1st batch that is greater than or equals to the
   * startOffset. It will read at least one batch, if the 1st batch size is larger than maxBytes.
   *
   * @param remoteLogIndexEntry The first remoteLogIndexEntry that remoteLogIndexEntry.lastOffset >= startOffset
   * @param maxBytes            maximum bytes to fetch for the given entry
   * @param startOffset         initial offset to be read from the given rdi in remoteLogIndexEntry
   * @param minOneMessage       if true read at least one record even if the size is more than maxBytes
   * @return
   */
  @throws(classOf[IOException])
  def read(remoteLogIndexEntry: RemoteLogIndexEntry, maxBytes: Int, startOffset: Long, minOneMessage: Boolean): Records


  /**
   * Search forward for the first message that meets the following requirements:
   * - Message's timestamp is greater than or equals to the targetTimestamp.
   * - Message's offset is greater than or equals to the startingOffset.
   *
   * @param targetTimestamp The timestamp to search for.
   * @param startingOffset  The starting offset to search.
   * @return The timestamp and offset of the message found. Null if no message is found.
   */
  @throws(classOf[IOException])
  def findOffsetByTimestamp(remoteLogIndexEntry: RemoteLogIndexEntry,
                            targetTimestamp: Long,
                            startingOffset: Long): TimestampAndOffset

  /**
   * Release any system resources used by this instance.
   */
  def close(): Unit
}

...

Proposed Changes

High-level design


RemoteLogManager (RLM) is a new component which

  • receives callback events for leadership changes and stop/delete events of topic partitions on a broker.
  • delegates copy, read, and delete of topic partition segments to a pluggable storage manager(viz RemoteStorageManager) implementation and maintains respective remote log segment metadata through RemoteLogMetadataManager.

The earlier approach consists of pulling the remote log segment metadata from remote log storage APIs as mentioned in the earlier (RemoteStorageManager_Old) section. This approach worked fine for storages like HDFS. One of the problems of relying on the remote storage to maintain metadata is that tiered-storage needs to have that as strongly consistent, with an impact not only on the metadata itself (e.g. LIST in S3) but also on the segment data (e.g. GET after a DELETE in S3). Additionally to consistency and availability, the cost (and to a lesser extent performance) of maintaining metadata in remote storage needs to be factored in. This is true in the case of S3, LIST APIs incur huge costs. 

So, we uncoupled the remote storage from the remote log metadata store with different consistency and availability requirements. We introduced remote log storage(as RemoteStorageManager) and remote log metadata storage(as RemoteLogMetadataManager). We separated RemoteStorageManager and RemoteLogMetadataManager after going through the challenges faced with eventually consistent stores like S3. You can see the discussion details in the doc located here.

RemoteLogManager (RLM) is a new component which

  • receives callback events for leadership changes and stop/delete events of topic partitions on a broker.
  • delegates copy, read, and delete of topic partition segments to a pluggable storage manager(viz RemoteStorageManager) implementation and maintains respective remote log segment metadata through RemoteLogMetadataManager.

RLM creates tasks for each leader or follower topic partition:

...

RLM maintains a bounded cache(possibly LRU) of the index files of remote log segments to avoid multiple index fetches from the remote storage. These indexes can be used in the same way as local segment indexes are used. 

Local and Remote log offset constraints

...