Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Local log segments are truncated in such a situation. We may not want to fetch and update the remote segment which may add a lot of complexity in handling the followers which started reading that segment. One way to address this is to update the respective remote log segment metadata with truncation offsets.  

...

For each topic partition that has RLM configured, RLM leader for a topic partition copies log segments which have last message offset less than last stable offset of that topic partition to remote storage. The active segment file (the last segment file of each partition, to which the new records are appending) is never shipped to remote storage.

After a segment file is copied to remote storage, RLM will append a set of index entries to 3 local index files: remoteLogIndex, remoteOffsetIndex, remoteTimeIndex. These index files are rotated by RLM at a configurable time interval (or a configurable size).

(active segment)

{log.dirs}/{topic-partition}/0000002400013.index

{log.dirs}/{topic-partition}/0000002400013.timeindex

{log.dirs}/{topic-partition}/0000002400013.log

(inactive segments)

{log.dirs}/{topic-partition}/0000002000238.index

{log.dirs}/{topic-partition}/0000002000238.timeindex

{log.dirs}/{topic-partition}/0000002000238.log

{log.dirs}/{topic-partition}/0000001600100.index

{log.dirs}/{topic-partition}/0000001600100.timeindex

{log.dirs}/{topic-partition}/0000001600100.log

( remote segment)

{log.dirs}/{topic-partition}/0000001000121.remoteOffsetIndex

{log.dirs}/{topic-partition}/0000001000121.remoteTimeIndex

{log.dirs}/{topic-partition}/0000001000121.remoteLogIndex

( remote segments)

{log.dirs}/{topic-partition}/0000000512002.remoteOffsetIndex

{log.dirs}/{topic-partition}/0000000512002.remoteTimeIndex

{log.dirs}/{topic-partition}/0000000512002.remoteLogIndex

Performance Test Results

We have tested the performance of the initial implementation of this proposal.

The cluster configuration:

  1. 5 brokers
  2. 20 CPU cores, 256GB RAM (each broker)
  3. 2TB * 22 hard disks in RAID0 (each broker)
  4. Hardware RAID card with NV-memory write cache
  5. 20Gbps network
  6. snappy compression
  7. 6300 topic-partitions with 3 replicas
  8. remote storage uses HDFS

Each test case is tested under 2 types of workload (acks=all and acks=1)


Workload-1

(at-least-once, acks=all)

Workload-2

(acks=1)

Producers

10 producers

30MB / sec / broker (leader)

~62K messages / sec / broker (leader)

10 producers

55MB / sec / broker (leader)

~120K messages / sec / broker (leader)

In-sync Consumers

10 consumers

120MB / sec / broker

~250K messages / sec / broker

10 consumers

220MB / sec / broker

~480K messages / sec / broker

Test case 1 (Normal case):

Normal traffic as described above.



with tiered storagewithout tiered storage

Workload-1

(acks=all, low traffic)

Avg P99 produce latency25ms21ms
Avg P95 produce latency14ms13ms

Workload-2

(acks=1, high traffic)

Avg P99 produce latency9ms9ms
Avg P95 produce latency4ms4ms

We can see there is a little overhead when tiered storage is turned on. This is expected, as the brokers have to ship segments to remote storage, and sync the remote segment metadata between brokers. With at-least-once (acks=all) produce, the produce latency is slightly increased when tiered storage is turned on. With acks=1 produce, the produce latency is almost not changed when tiered storage is turned on.

Test case 2 (out-of-sync consumers catching up):

In addition to the normal traffic, 9 out-of-sync consumers consume 180MB/s per broker (or 900MB/s in total) old data.

With tiered storage, the old data is read from HDFS. Without tiered storage, the old data is read from local disk.



with tiered storagewithout tiered storage

Workload-1

(acks=all, low traffic)

Avg P99 produce latency42ms60ms
Avg P95 produce latency18ms30ms

Workload-2

(acks=1, high traffic)

Avg P99 produce latency10ms10ms
Avg P95 produce latency5ms4ms

Consuming old data has a significant performance impact to acks=all producers. Without tiered storage, the P99 produce latency is almost tripled. With tiered storage, the performance impact is relatively lower, because remote storage reading does not compete the local hard disk bandwidth with produce requests.

Consuming old data has little impact to acks=1 producers.

Test case 3 (rebuild broker):

Under the normal traffic, stop a broker, remove all the local data, and rebuild it without replication throttling. This case simulates replacing a broken broker server.



with tiered storagewithout tiered storage

Workload-1

(acks=all,

12TB data per broker)

Max avg P99 produce latency56ms490ms
Max avg P95 produce latency23ms290ms
Duration2min230ms

Workload-2

(acks=1,

34TB data per broker)

Max avg P99 produce latency12ms10ms
Max avg P95 produce latency6ms5ms
Duration4min520min

With tiered storage, the rebuilding broker only needs to fetch the latest data that has not been shipped to remote storage. Without tiered storage, the rebuilt broker has to fetch all the data that has not expired from the other brokers. With the same log retention time, tiered storage reduced the rebuilding time by more than 100 times.

Without tiered storage, the rebuilding broker has to read a large amount of data from the local hard disks of the leaders. This competes page cache and local disk bandwidth with the normal traffic, and dramatically increases the acks=all produce latency.


Alternatives considered

Following alternatives were considered:

  1. Replace all local storage with remote storage - Instead of using local storage on Kafka brokers, only remote storage is used for storing log segments and offset index files. While this has the benefits related to reducing the local storage, it has the problem of not leveraging the local disk for efficient latest reads as done in Kafka today.

  2. Implement Kafka API on another store - This is an approach that is taken by some vendors where Kafka API is implemented on a different distributed, scalable storage (example HDFS). Such an option does not leverage Kafka other than API compliance and requires the much riskier option of replacing the entire Kafka cluster with another system.

  3. Client directly reads remote log segments from the remote storage - The log segments on the remote storage can be directly read by the client instead of serving it from Kafka broker. This reduces Kafka broker changes and has benefits of removing an extra hop. However, this bypasses Kafka security completely, increases Kafka client library complexity and footprint and hence is not considered.
  4. Remote Log Indexes
    Anchor
    rdi-format
    rdi-format

    For each topic partition that has RLM configured, RLM leader for a topic partition copies log segments which have last message offset less than last stable offset of that topic partition to remote storage. The active segment file (the last segment file of each partition, to which the new records are appending) is never shipped to remote storage.

    After a segment file is copied to remote storage, RLM will append a set of index entries to 3 local index files: remoteLogIndex, remoteOffsetIndex, remoteTimeIndex. These index files are rotated by RLM at a configurable time interval (or a configurable size).

    (active segment)

    {log.dirs}/{topic-partition}/0000002400013.index

    {log.dirs}/{topic-partition}/0000002400013.timeindex

    {log.dirs}/{topic-partition}/0000002400013.log


    (inactive segments)

    {log.dirs}/{topic-partition}/0000002000238.index

    {log.dirs}/{topic-partition}/0000002000238.timeindex

    {log.dirs}/{topic-partition}/0000002000238.log

    {log.dirs}/{topic-partition}/0000001600100.index

    {log.dirs}/{topic-partition}/0000001600100.timeindex

    {log.dirs}/{topic-partition}/0000001600100.log


    ( remote segment)

    {log.dirs}/{topic-partition}/0000001000121.remoteOffsetIndex

    {log.dirs}/{topic-partition}/0000001000121.remoteTimeIndex

    {log.dirs}/{topic-partition}/0000001000121.remoteLogIndex


    ( remote segments)

    {log.dirs}/{topic-partition}/0000000512002.remoteOffsetIndex

    {log.dirs}/{topic-partition}/0000000512002.remoteTimeIndex

    {log.dirs}/{topic-partition}/0000000512002.remoteLogIndex


    Each index entry of the remoteLogIndex file contains the information of a sequence of records in the remote log segment file. The format of a remoteLogIndex entry:

    magic: int16 (current magic value is 0)

    length: int16 (length of this entry)

    crc: int32 (checksum from firstOffset to the end of this entry)

    firstOffset: int64 (the Kafka offset of the 1st record)

    lastOffset: int64 (the Kafka offset of the last record)

    firstTimestamp: int64

    lastTimestamp: int64

    dataLength: int32 (length of the remote data)

    rdiLength: int16

    rdi: byte[] (Remote data identifier)


    RDI (Remote data identifier) is the "pointer" or "URI" of the remote data. The format of RDI depends on the implementation. For example, RDI can be HDFS file path and offset, or S3 key and offset. When reading the remote records, RLM will use RDI to retrieve the remote data.

    Depends on the implementation, RLM may append 1 or more entries to the remoteLogIndex file for each remote segment file. More entries will provide fine-grained indexing of the remote data with the cost of local disk space.

    The RemoteLogIndex entries are shipped to remote storage along with the segment data. The followers will retrieve those index entries from remote storage to build their own indices.

    Remoteoffsetindex file and remoteTimestampIndex file are similar with the existing .index file (offset index) and .timeindex file (timestamp index). The only difference is that they point to the index in the corresponding remoteLogIndex file instead of a log segment file.

    Image Added

Each index entry of the remoteLogIndex file contains the information of a sequence of records in the remote log segment file. The format of a remoteLogIndex entry:

...

magic: int16 (current magic value is 0)

length: int16 (length of this entry)

crc: int32 (checksum from firstOffset to the end of this entry)

firstOffset: int64 (the Kafka offset of the 1st record)

lastOffset: int64 (the Kafka offset of the last record)

firstTimestamp: int64

lastTimestamp: int64

dataLength: int32 (length of the remote data)

rdiLength: int16

rdi: byte[] (Remote data identifier)

RDI (Remote data identifier) is the "pointer" or "URI" of the remote data. The format of RDI depends on the implementation. For example, RDI can be HDFS file path and offset, or S3 key and offset. When reading the remote records, RLM will use RDI to retrieve the remote data.

Depends on the implementation, RLM may append 1 or more entries to the remoteLogIndex file for each remote segment file. More entries will provide fine-grained indexing of the remote data with the cost of local disk space.

The RemoteLogIndex entries are shipped to remote storage along with the segment data. The followers will retrieve those index entries from remote storage to build their own indices.

Remoteoffsetindex file and remoteTimestampIndex file are similar with the existing .index file (offset index) and .timeindex file (timestamp index). The only difference is that they point to the index in the corresponding remoteLogIndex file instead of a log segment file.

Image Removed

Performance Test Results

We have tested the performance of the initial implementation of this proposal.

The cluster configuration:

  1. 5 brokers
  2. 20 CPU cores, 256GB RAM (each broker)
  3. 2TB * 22 hard disks in RAID0 (each broker)
  4. Hardware RAID card with NV-memory write cache
  5. 20Gbps network
  6. snappy compression
  7. 6300 topic-partitions with 3 replicas
  8. remote storage uses HDFS

Each test case is tested under 2 types of workload (acks=all and acks=1)

...

Workload-1

(at-least-once, acks=all)

...

Workload-2

(acks=1)

...

10 producers

30MB / sec / broker (leader)

~62K messages / sec / broker (leader)

...

10 producers

55MB / sec / broker (leader)

~120K messages / sec / broker (leader)

...

10 consumers

120MB / sec / broker

~250K messages / sec / broker

...

10 consumers

220MB / sec / broker

~480K messages / sec / broker

Test case 1 (Normal case):

Normal traffic as described above.

...

Workload-1

(acks=all, low traffic)

...

Workload-2

(acks=1, high traffic)

...

We can see there is a little overhead when tiered storage is turned on. This is expected, as the brokers have to ship segments to remote storage, and sync the remote segment metadata between brokers. With at-least-once (acks=all) produce, the produce latency is slightly increased when tiered storage is turned on. With acks=1 produce, the produce latency is almost not changed when tiered storage is turned on.

Test case 2 (out-of-sync consumers catching up):

In addition to the normal traffic, 9 out-of-sync consumers consume 180MB/s per broker (or 900MB/s in total) old data.

With tiered storage, the old data is read from HDFS. Without tiered storage, the old data is read from local disk.

...

Workload-1

(acks=all, low traffic)

...

Workload-2

(acks=1, high traffic)

...

Consuming old data has a significant performance impact to acks=all producers. Without tiered storage, the P99 produce latency is almost tripled. With tiered storage, the performance impact is relatively lower, because remote storage reading does not compete the local hard disk bandwidth with produce requests.

Consuming old data has little impact to acks=1 producers.

Test case 3 (rebuild broker):

Under the normal traffic, stop a broker, remove all the local data, and rebuild it without replication throttling. This case simulates replacing a broken broker server.

...

Workload-1

(acks=all,

12TB data per broker)

...

Workload-2

(acks=1,

34TB data per broker)

...

With tiered storage, the rebuilding broker only needs to fetch the latest data that has not been shipped to remote storage. Without tiered storage, the rebuilt broker has to fetch all the data that has not expired from the other brokers. With the same log retention time, tiered storage reduced the rebuilding time by more than 100 times.

Without tiered storage, the rebuilding broker has to read a large amount of data from the local hard disks of the leaders. This competes page cache and local disk bandwidth with the normal traffic, and dramatically increases the acks=all produce latency.

Alternatives considered

Following alternatives were considered:

  1. Replace all local storage with remote storage - Instead of using local storage on Kafka brokers, only remote storage is used for storing log segments and offset index files. While this has the benefits related to reducing the local storage, it has the problem of not leveraging the local disk for efficient latest reads as done in Kafka today.
    Implement Kafka API on another store - This is an approach that is taken by some vendors where Kafka API is implemented on a different distributed, scalable storage (example HDFS). Such an option does not leverage Kafka other than API compliance and requires the much riskier option of replacing the entire Kafka cluster with another system.
    Client directly reads remote log segments from the remote storage - The log segments on the remote storage can be directly read by the client instead of serving it from Kafka broker. This reduces Kafka broker changes and has benefits of removing an extra hop. However, this bypasses Kafka security completely, increases Kafka client library complexity and footprint and hence is not considered.