Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Change "generation" -> "epoch"

...

Definition of Terms

  • Leader GenerationEpoch: A 32 bit, monotonically increasing number representing a continuous period of leadership for a single partition. This is marked on all messages.
  • Leader Generation Epoch Start Offset: The first offset in a new Leader GenerationEpoch.
  • Leader Generation Epoch Sequence File: A journal of changes of Leadership Generation Epoch mapping the Leader Generation Epoch to a starting offset for that generation epoch (Leader Generation Epoch Start Offset).
  • Leader Generation Epoch Request: A request made by a follower to the leader to retrieve the appropriate Leader Generation Epoch Start Offset. This is the Leader Generation Epoch Start Offset of the subsequent generationepoch, or the Log End Offset if there is no subsequent generationepoch. The follower uses this offset to truncate its log.

...

We can solve both of these issues by introducing the concept of a Leader GenerationEpoch. This allocates an identifier to a period of leadership, which is then added to each message by the leader. Each replica keeps a vector of [LeaderGeneration LeaderEpoch => StartOffset] to mark when leaders changed throughout the lineage of its log. This vector then replaces the high watermark when followers need to truncate data (and will be stored in a file for each replica).  So instead of a follower truncating to the High Watermark, the follower gets the appropriate LeaderGeneration LeaderEpoch from the leader’s vector of past LeaderGenerations LeaderEpochs and uses this to truncate only messages that do not exist in the leader’s log. So the leader effectively tells the follower what offset it needs to truncate to.

...

In this solution the follower makes a request to the leader to determine if it has any divergent generations epochs in its log. It sends a LeaderGenerationRequest LeaderEpochRequest to the leader for its current LeaderGenerationLeaderEpoch. In this case the leader returns the log end offset, although if the follower was lagging by more than one leader generationLeader Epoch, the leader would return the first offset in (Follower Leader Generation Epoch + 1). So that’s to say the LeaderGeneration LeaderEpoch response contains the offset where the requested LeaderGeneration LeaderEpoch ends.

In this case the LeaderGenerationResponse LeaderEpochResponse returns offset 2. Note this is different to the high watermark which, on the follower, is offset 0. Thus the follower does not truncate any messages and hence message m2 is not lost.

...

When the two brokers restart after a crash, broker B becomes leader. It accepts message m3 but with a new leader generationLeader Epoch, LG1. Now when broker A starts, and becomes a follower, it sends a LeaderGeneration LeaderEpoch request to the leader. This returns the first offset of LG1, which is offset 1. The follower knows that m2 is orphaned and truncates it. It then fetches from offset 1 and the logs are consistent.

Proposed Changes

Leader

...

Epoch

A leader has the an existing concept of a LeaderEpoch. This is a 32 bit number, managed by the Controller, stored in the Partition State Info in Zookeeper and passed to each new leader as part of the LeaderAndIsrRequest. We use the LeaderEpoch to define the LeaderGeneration for each partition.

LeaderGeneration == LeaderEpoch

Each message is stamped with the LeaderGeneration by  We propose stamping every message with the LeaderEpoch on the leader that accepts the produce request. This

...

LeaderEpoch number is then propagated through the replication protocol, and used to replace the High Watermark, as a reference point for message truncation. This process takes the following form:

Steps:

1. Evolve the message format so that every message set carries a 4-byte leader generation Leader Epoch number.

2. In every log directory, we create a new Leader Generation Epoch Sequence file, where we store the sequence of Leader Generation Epoch and the Start Offset of messages produced in that generationepoch. This is cached in every replica, in memory as well.

3. When a replica becomes a leader, it first adds the new Leader Generation Epoch and the log end offset of the replica to the end of Leader Generation Epoch Sequence file and flushes it *. Each new message set produced to the leader is tagged with the new Leader GenerationEpoch.

4. When a replica becomes a follower, it does the following steps:

4.1 Recover all  Leader Generations Epochs from the Leader Generation Epoch Sequence file, if needed.

4.2 Send a new LeaderGeneration LeaderEpoch request for the partition to the leader. The request includes the latest Leader Generation Epoch in the follower's Leader Generation Epoch Sequence.

4.3 The leader responds with the LastOffset for that LeaderGenerationLeaderEpoch. LastOffset will be the start offset of the first leader generation Leader Epoch larger than the Leader Generation Epoch passed in the request or the Log End Offset if the leader's current generation epoch is equal to the one requested.

4.4 If the follower has any LeaderGeneration LeaderEpoch with a start offset larger than the LastOffset returned from the leader, it resets its last Leader Generation Epoch Sequence to be the leader’s LastOffset and flushes the Leader Generation Epoch Sequence File.

4.5 The follower truncates its local log to the leader’s LastOffset.

...

4.7.1 During fetching, if the follower sees the LeaderGeneration LeaderEpoch of a message set larger than its latest LeaderGenerationLeaderEpoch, it adds the new LeaderGeneration LeaderEpoch and the starting offset to its LeaderGenerationSequence LeaderEpochSequence and flushes the file to disk.

...

For backward compatibility, in step 4.3, if the leader can't find the LastOffset (e.g., the leader hasn't started tracking leader generation Leader Epoch yet), the follower will fall back to the current approach by truncating the log to its high watermark.

...

It is possible that a leader might crash and rejoin the cluster without a round of leader election, meaning that the LeaderEpoch would not change. This can happen if a hard shutdown occurs, where all replicas for a partition crash. If the first machine starts back up, and was previously the leader, no election will occur and the LeaderEpoch would not be incremented, but, we require that such a condition trigger a new Leader GenerationEpoch.


To protect against this eventuality, if the controller receives a Broker Registration it will increment the Leader Epoch and propagate that value to the broker via the Leader and ISR Request (in the normal way). This mechanism must also be sensitive to the bug: 
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-1120

Maintaining the

...

LeaderEpochSequenceFile

The leader generation Leader Epoch information will be held in files, per replica, on each broker. The files represent a commit log of LeaderGeneration LeaderEpoch changes for each replica and are cached in memory on each broker. When segments are deleted, or compacted the appropriate entries are removed from the cache & file. For deleted segments all LeaderGeneration LeaderEpoch entries for offsets less than equal to the segment log end offset are removed from the LeaderGenerationSequenceFileLeaderEpochSequenceFileFor compacted topics the process is slightly more involved. The LogCleaner creates a LeaderGenerationSequence LeaderEpochSequence for every segment it cleans, which represents the earliest offset for each LeaderGenerationLeaderEpoch. Before the old segment(s) are replaced with the cleaned one(s) the LeaderGenerationSequenceFile LeaderEpochSequenceFile is regenerated and the cache updated.

...

After an unclean shutdown the Leader Generation Epoch Sequence File could be ahead of the log (as the log is flushed asynchronously by default). Thus, if an unclean shutdown is detected on broker start (regardless of whether it's a leader or follower), all entries are removed from the Leader Generation Epoch Sequence File, where the offset is greater than the log end offset.

Public Interfaces

Add API for

...

LeaderEpochRequest/Response

Leader Generation Epoch Request V0

LeaderGenerationLeaderEpochRequest (Version: 0) => [partition_leader_generationepoch
 partition_leader_generation epoch => partition_id, leader_generationepoch

   partition_id => INT32

   leader_generation epoch => INT32

Leader Generation Epoch Response V0

LeaderGenerationLeaderEpoch Response (Version: 0) => [generationepoch_and_offset]
 generation epoch_and_offset => error_id, partition_id, leader_generationepoch, start_offset

   error_id => Int16

   leader_generation epoch => INT32

   partition_id => INT32

   start_offset => INT64

Error Codes:

-1 -> Unknown Error

0 -> No Error

1 -> Not leader for partition

2 -> No generation epoch information for partition

...

  1. The offset returned in the response will be the start offset of the first leader generation Leader Epoch larger than last_leader_generationepoch_num or the Log End Offset if the leader's current generation epoch is equal to the partition_leader_generation epoch from the request.

  2. The response will only include offsets for partition IDs, supplied in the request, which are leaders on the broker the request was sent to.

Add

...

LeaderEpoch field to MessageSet

LeaderGeneration LeaderEpoch is added to MessageSets used in Fetch Responses returned as part of the internal replication protocol

MessageSet

MessageSet => [Offset MessageSize Message]

 Offset => int64

 MessageSize => int32

 leader_generation epoch => int32 <--------[NEW]

Add Leader

...

Epoch Sequence File per Partition

A file will be used, per replica (located inside the log directory), containing the leader epoch and its corresponding start offset. This will be a text file with the schema:

leader-generationepoch-sequence-file

Version int32

[leader_generation epoch int32 start_offset int64]

...

  1. Upgrade the brokers once with the inter-broker protocol set to the previous deployed version.

  2. Upgrade the brokers again with an updated inter-broker protocol, but leaving the message format unchanged.

  3. Upgrade all or most clients.

  4. Restart the brokers, with the message format version set to the latest.

The LeaderGenerationRequest LeaderEpochRequest will only be sent if all brokers support it. Otherwise the existing logic for High Watermark truncation will be used.

...

We considered whether the cleaning of the LeaderGenerationSequenceFiles LeaderEpochSequenceFiles would be simpler if there was one LGSF per segment, so they were effectively immutable once the segment is no longer active. This was rejected as there will be many log segments in the same leader generationLeader Epoch, typically meaning we could create a fair number of files we do not need.

...