Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

It is very important that the log remain consistent across all of the replicas after log compaction. The current implementation of log compaction in Kafka can lead to divergent logs because it is possible for slow replicas to miss the tombstone records. Tombstone records are deleted after a timeout and are not guarantee to have been replicated to all of the replicas before they are deleted. It should be possible to extend log compaction and the fetch protocol as explained in the Rejected Alternatives "Just fix the tombstone GC issue with log compaction". We believed that this conservative approach is equally, if not more complicated than snapshots as explain explained in this proposal and more importantly it doesn't address the other items in the motivation section above.

...

Code Block
Kafka Replicated Log:

      LBO  --               high-watermark --      LEO --
            V                               V           V
        ---------------------------------------------------
offset: | x + 1 | ... | y | y + 1 |  ...  |   |  ...  |   |
epoch:  | b     | ... | c | d     |  ...  |   |  ...  |   |
        ---------------------------------------------------

Kafka Snapshot Files:

<topic_name>-<partition_index>/checkpoints/x-a.checkpoint
<topic_name>-<partition_index>/checkpoints/y-c.checkpoint

Note: The extension checkpoint will be used since Kafka already has file with the snapshot extension.

...

The Kafka Controller is responsible for the format of the snapshot. The snapshot will be stored in the checkpoints directory in the topic partition directory and will have a name of <SnapshotOffset>-<SnapshotOffsetEpoch>.checkpoint. For example for the topic __cluster_metadata, partition 0, snapshot offset 5120793 and snapshot epoch 2, the full filename will be __cluster_metadata-0/checkpoints/00000000000005120793-200000000000000000002.checkpoint.

For details on the content of this snapshot file and how it manages compatibility with future changes see KIP-631 and the section on Record Format Versions.

When to Snapshot

The disk format for snapshot files will be the same as the version 2 of the log format. This is the version 2 log format for reference:

Code Block
RecordBatch => BatchHeader [Record]

BatchHeader
   BaseOffset => Int64
   Length => Int32
   PartitionLeaderEpoch => Int32
   Magic => Int8
   CRC => Uint32
   Attributes => Int16
   LastOffsetDelta => Int32 // also serves as LastSequenceDelta
   FirstTimestamp => Int64
   MaxTimestamp => Int64
   ProducerId => Int64
   ProducerEpoch => Int16
   BaseSequence => Int32

Record =>
   Length => Varint
   Attributes => Int8
   TimestampDelta => Varlong
   OffsetDelta => Varint
   Key => Bytes
   Value => Bytes
   Headers => [HeaderKey HeaderValue]
     HeaderKey => String
     HeaderValue => Bytes

Using version 2 of the log format will allow the Kafka Controller to compress records and identify corrupted records in the snapshot. Even though snapshot use the log format for storing this state there is no requirement:

  1. To use valid BaseOffset and OffsetDelta in the BatchHeader and Record respectively.
  2. For the records in the snapshot to match the records in the replicated log.

When to Snapshot

If the Kafka Controller generates a snapshot too frequently then can negatively affect the performance of the disk. If it doesn't generate a snapshot often enough then it can increase the amount of time it If the Kafka Controller generates a snapshot too frequently then can negatively affect the performance of the disk. If it doesn't generate a snapshot often enough then it can increase the amount of time it takes to load its state into memory and it can increase the amount space taken up by the replicated log. The Kafka Controller will have a new configuration option controller.snapshot.minimum.records.  If the number of records between the latest snapshot and the high-watermark is greater than controller.snapshot.minimum.records, then the Kafka Controller will perform a new snapshot.

...

After loading the snapshot, the Kafka Controller will can apply operations in the replicated log with an offset greater than the snapshot’s offset and less than the high-watermark.

Changes to Leader Election

...