...
As we support compressed message sets this can, at worst, lead to an inability for replicas to make progress. This happens when the offset for a compressed message set in one replica points to the midpoint of a compressed message set in another.
There is another known issue which is similar, if unclean leader election is enabled and machines are lost. This is similar, in essence, to the Scenario 2 so we will not cover it here.
So in short, whist it is permissible to lose messages in this case, it is not permissible to have divergent logs.
Solution
We can solve both of these issues by introducing the concept of a Leader Epoch. This allocates an identifier to a period of leadership, which is then added to each message by the leader. Each replica keeps a vector of [LeaderEpoch => StartOffset] to mark when leaders changed throughout the lineage of its log. This vector then replaces the high watermark when followers need to truncate data (and will be stored in a file for each replica). So instead of a follower truncating to the High Watermark, the follower gets the appropriate LeaderEpoch from the leader’s vector of past LeaderEpochs and uses this to truncate only messages that do not exist in the leader’s log. So the leader effectively tells the follower what offset it needs to truncate to.
We can walk through the implementation of this by addressing scenario 1:
Solution
We can solve both of these issues by introducing the concept of a Leader Epoch. This allocates an identifier to a period of leadership, which is then added to each message by the leader. Each replica keeps a vector of [LeaderEpoch => StartOffset] to mark when leaders changed throughout the lineage of its log. This vector then replaces the high watermark when followers need to truncate data (and will be stored in a file for each replica). So instead of a follower truncating to the High Watermark, the follower gets the appropriate LeaderEpoch from the leader’s vector of past LeaderEpochs and uses this to truncate only messages that do not exist in the leader’s log. So the leader effectively tells the follower what offset it needs to truncate to.
We can walk through the implementation of this by addressing scenario 1:
In this solution the follower makes a request to the leader to determine if it has any divergent epochs in its log. It sends a LeaderEpochRequest to the leader for its current LeaderEpoch. In this case the leader returns the log end offset, although if the follower was lagging by more than one Leader Epoch, the leader would return the first offset in (Follower Leader Epoch + In this solution the follower makes a request to the leader to determine if it has any divergent epochs in its log. It sends a LeaderEpochRequest to the leader for its current LeaderEpoch. In this case the leader returns the log end offset, although if the follower was lagging by more than one Leader Epoch, the leader would return the first offset in (Follower Leader Epoch + 1). So that’s to say the LeaderEpoch response contains the offset where the requested LeaderEpoch ends.
...
When the two brokers restart after a crash, broker B becomes leader. It accepts message m3 but with a new Leader Epoch, LG1. Now when broker A starts, and becomes a follower, it sends a LeaderEpoch request to the leader. This returns the first offset of LG1, which is offset 1. The follower knows that m2 is orphaned and truncates it. It then fetches from offset 1 and the logs are consistent.
Proposed Changes
NB: It should be noted that this proposal does not protect against log divergence for clusters that use the setting unclean.leader.election.enable=true.
Proposed Changes
Leader Epoch
A leader has an existing concept of a LeaderEpoch. This is a 32 bit number, managed by the Controller, stored in the Partition State Info in Zookeeper and passed to each new leader as part of the LeaderAndIsrRequest. We propose stamping every message with the LeaderEpoch on the leader that accepts the produce request. This LeaderEpoch number is then propagated through the replication protocol, and used to replace the High Watermark, as a reference point for message truncation. This process takes the following form:
...
The offset returned in the response will be the start offset of the first Leader Epoch larger than last_leader_epoch_num or the Log End Offset if the leader's current epoch is equal to the partition_leader_epoch from the request.
The response will only include offsets for partition IDs, supplied in the request, which are leaders on the broker the request was sent to.
Add LeaderEpoch field to MessageSet
LeaderEpoch is added to MessageSets used in Fetch Responses returned as part of the internal replication protocol
MessageSet |
---|
|
Add Leader Epoch Sequence File per Partition
A file will be used, per replica (located inside the log directory), containing the leader epoch and its corresponding start offset. This will be a text file with the schema:
leader-epoch-sequence-file |
---|
|
Compatibility, Deprecation, and Migration Plan
For the message format changes, the upgrade path from KIP-32 can be reused. To upgrade from a previous message format version, users should:
Upgrade the brokers once with the inter-broker protocol set to the previous deployed version.
Upgrade the brokers again with an updated inter-broker protocol, but leaving the message format unchanged.
Upgrade all or most clients.
Restart the brokers, with the message format version set to the latest.
The LeaderEpochRequest will only be sent if all brokers support it. Otherwise the existing logic for High Watermark truncation will be used.
Rejected Alternatives
As discussed in the Motivations section, Scenario 1 could be addressed by delaying truncation until fetch requests return from the leader. This could potentially be done by creating a temporary segment file(s) which contains the messages required for the follower to catch up. Then, once caught up, the original segment is truncated to the High Watermark and the temporary segment(s) made active. This was rejected it does not solve Scenario 2.
...
from the request.
The response will only include offsets for partition IDs, supplied in the request, which are leaders on the broker the request was sent to.
Add LeaderEpoch field to MessageSet
LeaderEpoch is added to MessageSets used in Fetch Responses returned as part of the internal replication protocol
MessageSet |
---|
|
Add Leader Epoch Sequence File per Partition
A file will be used, per replica (located inside the log directory), containing the leader epoch and its corresponding start offset. This will be a text file with the schema:
leader-epoch-sequence-file |
---|
|
Compatibility, Deprecation, and Migration Plan
For the message format changes, the upgrade path from KIP-32 can be reused. To upgrade from a previous message format version, users should:
Upgrade the brokers once with the inter-broker protocol set to the previous deployed version.
Upgrade the brokers again with an updated inter-broker protocol, but leaving the message format unchanged.
Upgrade all or most clients.
Restart the brokers, with the message format version set to the latest.
The LeaderEpochRequest will only be sent if all brokers support it. Otherwise the existing logic for High Watermark truncation will be used.
Rejected Alternatives
As discussed in the Motivations section, Scenario 1 could be addressed by delaying truncation until fetch requests return from the leader. This could potentially be done by creating a temporary segment file(s) which contains the messages required for the follower to catch up. Then, once caught up, the original segment is truncated to the High Watermark and the temporary segment(s) made active. This was rejected it does not solve Scenario 2.
We considered whether the cleaning of the LeaderEpochSequenceFiles would be simpler if there was one LGSF per segment, so they were effectively immutable once the segment is no longer active. This was rejected as there will be many log segments in the same Leader Epoch, typically meaning we could create a fair number of files we do not need.
Appendix (a): Possibility for Divergent Logs with Leader Epochs & Unclean Leader Election