Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

It may be possible that log segments to truncate are in tiered storage when unclean leader election is enabled for a partition. It is possible that segments offloaded to the tiered storage (which were eligible as their end offset < LSO) need to be truncated when leadership of a topic-partition moved to a previously lagging follower. The following diagram illustrates a basic scenario that shows a situation where a remote segment would require truncation based on to exhibit the same replica lineage as local segments.

todo: We will update this digram with other scenarios in handling log truncation. 

Local log segments are truncated in such a situation. We may not want to fetch and update the remote segment which may add a lot of complexity in handling the followers which started reading that segment. One way to address this is to update the respective remote log segment metadata with truncation offsets.  

...

  1. Replace all local storage with remote storage - Instead of using local storage on Kafka brokers, only remote storage is used for storing log segments and offset index files. While this has the benefits related to reducing the local storage, it has the problem of not leveraging the local disk for efficient latest reads as done in Kafka today.

  2. Implement Kafka API on another store - This is an approach that is taken by some vendors where Kafka API is implemented on a different distributed, scalable storage (example HDFS). Such an option does not leverage Kafka other than API compliance and requires the much riskier option of replacing the entire Kafka cluster with another system.

  3. Client directly reads remote log segments from the remote storage - The log segments on the remote storage can be directly read by the client instead of serving it from Kafka broker. This reduces Kafka broker changes and has benefits of removing an extra hop. However, this bypasses Kafka security completely, increases Kafka client library complexity and footprint and hence is not considered.
  4. Remote Log Indexes
    Anchor
    rdi-format
    rdi-format

    For each topic partition that has RLM configured, RLM leader for a topic partition copies log segments which have last message offset less than last stable offset of that topic partition to remote storage. The active segment file (the last segment file of each partition, to which the new records are appending) is never shipped to remote storage.

    After a segment file is copied to remote storage, RLM will append a set of index entries to 3 local index files: remoteLogIndex, remoteOffsetIndex, remoteTimeIndex. These index files are rotated by RLM at a configurable time interval (or a configurable size).

    (active segment)

    {log.dirs}/{topic-partition}/0000002400013.index

    {log.dirs}/{topic-partition}/0000002400013.timeindex

    {log.dirs}/{topic-partition}/0000002400013.log


    (inactive segments)

    {log.dirs}/{topic-partition}/0000002000238.index

    {log.dirs}/{topic-partition}/0000002000238.timeindex

    {log.dirs}/{topic-partition}/0000002000238.log

    {log.dirs}/{topic-partition}/0000001600100.index

    {log.dirs}/{topic-partition}/0000001600100.timeindex

    {log.dirs}/{topic-partition}/0000001600100.log


    ( remote segment)

    {log.dirs}/{topic-partition}/0000001000121.remoteOffsetIndex

    {log.dirs}/{topic-partition}/0000001000121.remoteTimeIndex

    {log.dirs}/{topic-partition}/0000001000121.remoteLogIndex


    ( remote segments)

    {log.dirs}/{topic-partition}/0000000512002.remoteOffsetIndex

    {log.dirs}/{topic-partition}/0000000512002.remoteTimeIndex

    {log.dirs}/{topic-partition}/0000000512002.remoteLogIndex


    Each index entry of the remoteLogIndex file contains the information of a sequence of records in the remote log segment file. The format of a remoteLogIndex entry:

    magic: int16 (current magic value is 0)

    length: int16 (length of this entry)

    crc: int32 (checksum from firstOffset to the end of this entry)

    firstOffset: int64 (the Kafka offset of the 1st record)

    lastOffset: int64 (the Kafka offset of the last record)

    firstTimestamp: int64

    lastTimestamp: int64

    dataLength: int32 (length of the remote data)

    rdiLength: int16

    rdi: byte[] (Remote data identifier)


    RDI (Remote data identifier) is the "pointer" or "URI" of the remote data. The format of RDI depends on the implementation. For example, RDI can be HDFS file path and offset, or S3 key and offset. When reading the remote records, RLM will use RDI to retrieve the remote data.

    Depends on the implementation, RLM may append 1 or more entries to the remoteLogIndex file for each remote segment file. More entries will provide fine-grained indexing of the remote data with the cost of local disk space.

    The RemoteLogIndex entries are shipped to remote storage along with the segment data. The followers will retrieve those index entries from remote storage to build their own indices.

    Remoteoffsetindex file and remoteTimestampIndex file are similar with the existing .index file (offset index) and .timeindex file (timestamp index). The only difference is that they point to the index in the corresponding remoteLogIndex file instead of a log segment file.

...