Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Added appendix to cover divergence possibility where unclean leader election is enabled

...

As we support compressed message sets this can, at worst, lead to an inability for replicas to make progress. This happens when the offset for a compressed message set in one replica points to the midpoint of a compressed message set in another.

There is another known issue which is similar, if unclean leader election is enabled and machines are lost. This is similar, in essence, to the Scenario 2 so we will not cover it here.

So in short, whist it is permissible to lose messages in this case, it is not permissible to have divergent logs.

Solution

We can solve both of these issues by introducing the concept of a Leader Epoch. This allocates an identifier to a period of leadership, which is then added to each message by the leader. Each replica keeps a vector of [LeaderEpoch => StartOffset] to mark when leaders changed throughout the lineage of its log. This vector then replaces the high watermark when followers need to truncate data (and will be stored in a file for each replica).  So instead of a follower truncating to the High Watermark, the follower gets the appropriate LeaderEpoch from the leader’s vector of past LeaderEpochs and uses this to truncate only messages that do not exist in the leader’s log. So the leader effectively tells the follower what offset it needs to truncate to.

Image Removed

We can walk through the implementation of this by addressing scenario 1:

Solution

We can solve both of these issues by introducing the concept of a Leader Epoch. This allocates an identifier to a period of leadership, which is then added to each message by the leader. Each replica keeps a vector of [LeaderEpoch => StartOffset] to mark when leaders changed throughout the lineage of its log. This vector then replaces the high watermark when followers need to truncate data (and will be stored in a file for each replica).  So instead of a follower truncating to the High Watermark, the follower gets the appropriate LeaderEpoch from the leader’s vector of past LeaderEpochs and uses this to truncate only messages that do not exist in the leader’s log. So the leader effectively tells the follower what offset it needs to truncate to.

Image Added

We can walk through the implementation of this by addressing scenario 1:

In this solution the follower makes a request to the leader to determine if it has any divergent epochs in its log. It sends a LeaderEpochRequest to the leader for its current LeaderEpoch. In this case the leader returns the log end offset, although if the follower was lagging by more than one Leader Epoch, the leader would return the first offset in (Follower Leader Epoch + In this solution the follower makes a request to the leader to determine if it has any divergent epochs in its log. It sends a LeaderEpochRequest to the leader for its current LeaderEpoch. In this case the leader returns the log end offset, although if the follower was lagging by more than one Leader Epoch, the leader would return the first offset in (Follower Leader Epoch + 1). So that’s to say the LeaderEpoch response contains the offset where the requested LeaderEpoch ends.

...

When the two brokers restart after a crash, broker B becomes leader. It accepts message m3 but with a new Leader Epoch, LG1. Now when broker A starts, and becomes a follower, it sends a LeaderEpoch request to the leader. This returns the first offset of LG1, which is offset 1. The follower knows that m2 is orphaned and truncates it. It then fetches from offset 1 and the logs are consistent.

Proposed Changes

NB: It should be noted that this proposal does not protect against log divergence for clusters that use the setting unclean.leader.election.enable=true.

Proposed Changes

Leader Epoch

A leader has an existing concept of a LeaderEpoch. This is a 32 bit number, managed by the Controller, stored in the Partition State Info in Zookeeper and passed to each new leader as part of the LeaderAndIsrRequest. We propose stamping every message with the LeaderEpoch on the leader that accepts the produce request. This LeaderEpoch number is then propagated through the replication protocol, and used to replace the High Watermark, as a reference point for message truncation. This process takes the following form:

...

  1. The offset returned in the response will be the start offset of the first Leader Epoch larger than last_leader_epoch_num or the Log End Offset if the leader's current epoch is equal to the partition_leader_epoch from the request.

  2. The response will only include offsets for partition IDs, supplied in the request, which are leaders on the broker the request was sent to.

Add LeaderEpoch field to MessageSet

LeaderEpoch is added to MessageSets used in Fetch Responses returned as part of the internal replication protocol

MessageSet

MessageSet => [Offset MessageSize Message]

 Offset => int64

 MessageSize => int32

 leader_epoch => int32 <--------[NEW]

Add Leader Epoch Sequence File per Partition

A file will be used, per replica (located inside the log directory), containing the leader epoch and its corresponding start offset. This will be a text file with the schema:

leader-epoch-sequence-file

Version int32

[leader_epoch int32 start_offset int64]

Compatibility, Deprecation, and Migration Plan

For the message format changes, the upgrade path from KIP-32 can be reused. To upgrade from a previous message format version, users should:

  1. Upgrade the brokers once with the inter-broker protocol set to the previous deployed version.

  2. Upgrade the brokers again with an updated inter-broker protocol, but leaving the message format unchanged.

  3. Upgrade all or most clients.

  4. Restart the brokers, with the message format version set to the latest.

The LeaderEpochRequest will only be sent if all brokers support it. Otherwise the existing logic for High Watermark truncation will be used.

Rejected Alternatives

As discussed in the Motivations section, Scenario 1 could be addressed by delaying truncation until fetch requests return from the leader. This could potentially be done by creating a temporary segment file(s) which contains the messages required for the follower to catch up. Then, once caught up, the original segment is truncated to the High Watermark and the temporary segment(s) made active. This was rejected it does not solve Scenario 2.

...

  1. from the request.

  2. The response will only include offsets for partition IDs, supplied in the request, which are leaders on the broker the request was sent to.

Add LeaderEpoch field to MessageSet

LeaderEpoch is added to MessageSets used in Fetch Responses returned as part of the internal replication protocol

MessageSet

MessageSet => [Offset MessageSize Message]

 Offset => int64

 MessageSize => int32

 leader_epoch => int32 <--------[NEW]

Add Leader Epoch Sequence File per Partition

A file will be used, per replica (located inside the log directory), containing the leader epoch and its corresponding start offset. This will be a text file with the schema:

leader-epoch-sequence-file

Version int32

[leader_epoch int32 start_offset int64]

Compatibility, Deprecation, and Migration Plan

For the message format changes, the upgrade path from KIP-32 can be reused. To upgrade from a previous message format version, users should:

  1. Upgrade the brokers once with the inter-broker protocol set to the previous deployed version.

  2. Upgrade the brokers again with an updated inter-broker protocol, but leaving the message format unchanged.

  3. Upgrade all or most clients.

  4. Restart the brokers, with the message format version set to the latest.

The LeaderEpochRequest will only be sent if all brokers support it. Otherwise the existing logic for High Watermark truncation will be used.

Rejected Alternatives

As discussed in the Motivations section, Scenario 1 could be addressed by delaying truncation until fetch requests return from the leader. This could potentially be done by creating a temporary segment file(s) which contains the messages required for the follower to catch up. Then, once caught up, the original segment is truncated to the High Watermark and the temporary segment(s) made active. This was rejected it does not solve Scenario 2.

We considered whether the cleaning of the LeaderEpochSequenceFiles would be simpler if there was one LGSF per segment, so they were effectively immutable once the segment is no longer active. This was rejected as there will be many log segments in the same Leader Epoch, typically meaning we could create a fair number of files we do not need.

Appendix (a): Possibility for Divergent Logs with Leader Epochs & Unclean Leader Election

 

There is still the possibility for the log to corrupt, even with Leader epochs, if min.isr=1 and unclean.leader.election.enabled=true. Consider two brokers A,B, a single topic, a single partition, reps=2, min.isr=1.

 

Intuitively the issue can be seen as:
-> The first two writes create a divergent log at offset 0 on completely isolated brokers.
-> The second two writes “cover up” that first divergent write so the LeaderEpoch request doesn’t see it.

 

Scenario:

 

1. [LeaderEpoch0] Write a message to A (offset A:0), Stop broker A. Bring up broker B which becomes leader

 

2. [LeaderEpoch1] Write a message to B (offset B:0), Stop broker B. Bring up broker A which becomes leader

 

3. [LeaderEpoch2] Write a message to A (offset A:1), Stop broker A. Bring up broker B which becomes leader

 

4. [LeaderEpoch3] Write a message to B (offset B:1), 

 

5. Bring up broker A. It sends a Epoch Request for Epoch 2 to broker B. B has only epochs 1,3, not 2, so it replies with the first offset of Epoch 3 (which is 1). So offset 0 is divergent. 

 

The underlying problem here is that, whilst B can tell something is wrong, it can't tell where in the log the divergence started. 

 

One solution is to detect the break, by comparing complete epoch lineage between brokers, then truncate either to (a) zero or (b) the point of divergence, then refetch. However compacted topics make both of these options hard as arbitrary epochs & offset information can be 'lost' from the log. This information could be retained and managed in the LeaderEpoch file instead, but the whole solution is becoming quite complex. Hence it seems sensible to forgo this guarantee for the unclean leader election case, or at least push it to a subsequent kip.