Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Contents

Table of Contents

Status

Current state: Draft

Discussion thread: [todo]

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Definition of Terms

  • Leader Generation: A 32 bit, monotonically increasing number representing a continuous period of leadership for a single partition. This is marked on all messages.
  • Leader Generation Start Offset: The first offset in a new Leader Generation.
  • Leader Generation Sequence File: A journal of changes of Leadership Generation mapping the Leader Generation to a starting offset for that generation (Leader Generation Start Offset).
  • Leader Generation Request: A request made by a follower to the leader to retrieve the appropriate Leader Generation Start Offset. This is the Leader Generation Start Offset of the subsequent generation, or the Log End Offset if there is no subsequent generation. The follower uses this offset to truncate its log.

Motivation

There are a few known areas where replication in Kafka can either create an unexpected lineage in the log, or worse cause runtime outages due to replica divergence. This KIP proposes a change to the replication protocol to ensure such cases cannot occur.  First let us describe two use cases where the current replication protocol can lose data or diverge:

Scenario 1: High Watermark Truncation followed by Immediate Leader Election

The replication protocol in Kafka has two phases. Initially the the follower fetches messages. So it might fetch message m2. On the next round of RPC it will confirm receipt of message m2 and, assuming other replicas have confirmed successfully, the leader will progress the High Watermark. This is then passed back to the followers in the responses to their fetch requests. So the leader controls the progression rate of the High Watermark, which is propagated back to followers in subsequent rounds of RPC.

...

So the essence of this problem is the follower takes an extra round of RPC to update its high watermark. This gap leaves the possibility for a fast leader change to result in data loss as a committed message can be truncated by the follower. There are a couple of simple solutions to this. One is to wait for the followers to move their High Watermark before updating it on the leader. This is not ideal as it adds an extra round of RPC latency to the protocol. Another would be to not truncate on the follower until the fetch request returns from the leader. This should work, but it doesn’t address the second problem discussed below.

Scenario 2: Replica Divergence on Restart after Multiple Hard Failures

Imagine again we have two brokers, but this time we have a power outage which affects both of them. It is acceptable, in such a case, that we could lose data if n replicas are lost (Kafka guarantees durability for n-1 replicas). Unfortunately there is an opportunity for the logs, on different machines, to diverge and even, in the worst case, for replication to become stuck.

...

So in short, whist it is permissible to lose messages in this case, it is not permissible to have divergent logs.

Solution

We can solve both of these issues by introducing the concept of a Leader Generation. This allocates an identifier to a period of leadership, which is then added to each message by the leader. Each replica keeps a vector of [LeaderGeneration => StartOffset] to mark when leaders changed throughout the lineage of its log. This vector then replaces the high watermark when followers need to truncate data (and will be stored in a file for each replica).  So instead of a follower truncating to the High Watermark, the follower gets the appropriate LeaderGeneration from the leader’s vector of past LeaderGenerations and uses this to truncate only messages that do not exist in the leader’s log. So the leader effectively tells the follower what offset it needs to truncate to.

...

When the two brokers restart after a crash, broker B becomes leader. It accepts message m3 but with a new leader generation, LG1. Now when broker A starts, and becomes a follower, it sends a LeaderGeneration request to the leader. This returns the first offset of LG1, which is offset 1. The follower knows that m2 is orphaned and truncates it. It then fetches from offset 1 and the logs are consistent.

Proposed Changes

Leader Generation

A leader has the concept of a LeaderEpoch. This is a 32 bit number, managed by the Controller, stored in the Partition State Info in Zookeeper and passed to each new leader as part of the LeaderAndIsrRequest. We use the LeaderEpoch to define the LeaderGeneration for each partition.

...

*Flushing requires an I/O and could delay the process of becoming leaders. We could potentially optimize this by not flushing the LGS file here and moving the flushing to the background thread. Basically, when flushing a log segment, we now require the corresponding LGS file to be flushed first if at least one of the starting offsets in LGS is in this segment. If the broker has a hard crash, we will rebuild the potentially missing part in LGS from the last flushing point.

Additional Concerns

Extending LeaderEpoch to include Returning Leaders

It is possible that a leader might crash and rejoin the cluster without a round of leader election, meaning that the LeaderEpoch would not change. This can happen if a hard shutdown occurs, where all replicas for a partition crash. If the first machine starts back up, and was previously the leader, no election will occur and the LeaderEpoch would not be incremented, but, we require that such a condition trigger a new Leader Generation.

...

To protect against this eventuality the controller will maintain a cached mapping of [broker -> Zookeeper CZXID] (CZXID is a unique and monotonic 64-bit number) for the broker’s registration in Zookeeper (/brokers/ids/[brokerId]). If the controller receives a Broker Registration where the CZXID has changed it will increment the Leader Epoch and propagate that value to the broker via the Leader and ISR Request (in the normal way), then update the cached CZXID for that broker.

Maintaining the LeaderGenerationSequenceFile

The leader generation information will be held in files, per replica, on each broker. The files represent a commit log of LeaderGeneration changes for each replica and are cached in memory on each broker. When segments are deleted, or compacted the appropriate entries are removed from the cache & file. For deleted segments all LeaderGeneration entries for offsets less than equal to the segment log end offset are removed from the LeaderGenerationSequenceFile. For compacted topics the process is slightly more involved. The LogCleaner creates a LeaderGenerationSequence for every segment it cleans, which represents the earliest offset for each LeaderGeneration. Before the old segment(s) are replaced with the cleaned one(s) the LeaderGenerationSequenceFile is regenerated and the cache updated.

Unclean Shutdown

After an unclean shutdown the Leader Generation Sequence File could be ahead of the log (as the log is flushed asynchronously by default). Thus, if an unclean shutdown is detected on broker start (regardless of whether it's a leader or follower), all entries are removed from the Leader Generation Sequence File, where the offset is greater than the log end offset.

Public Interfaces

Add API for LeaderGenerationRequest/Response

Leader Generation Request V0

LeaderGenerationRequest (Version: 0) => [partition_leader_generation] 
 partition_leader_generation => partition_id, leader_generation

   partition_id => INT32

   leader_generation => INT32

...

  1. The offset returned in the response will be the start offset of the first leader generation larger than last_leader_generation_num or the Log End Offset if the leader's current generation is equal to the partition_leader_generation from the request.

  2. The response will only include offsets for partition IDs, supplied in the request, which are leaders on the broker the request was sent to.

Add LeaderGeneration field to MessageSet

LeaderGeneration is added to MessageSets used in Fetch Responses returned as part of the internal replication protocol

MessageSet

MessageSet => [Offset MessageSize Message]

 Offset => int64

 MessageSize => int32

 leader_generation => int32 <--------[NEW]

Add Leader Generation Sequence File per Partition

A file will be used, per replica (located inside the log directory), containing the leader epoch and its corresponding start offset. This will be a text file with the schema:

leader-generation-sequence-file

Version int32

[leader_generation int32 start_offset int64]

Compatibility, Deprecation, and Migration Plan

For the message format changes, the upgrade path from KIP-32 can be reused. To upgrade from a previous message format version, users should:

...

The LeaderGenerationRequest will only be sent if all brokers support it. Otherwise the existing logic for High Watermark truncation will be used.

Rejected Alternatives

As discussed in the Motivations section, Scenario 1 could be addressed by delaying truncation until fetch requests return from the leader. This could potentially be done by creating a temporary segment file(s) which contains the messages required for the follower to catch up. Then, once caught up, the original segment is truncated to the High Watermark and the temporary segment(s) made active. This was rejected it does not solve Scenario 2.

...