Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In KIP-320, we introduced fencing support to the Fetch API, which gives consumers the ability to reason about the fetch state relative to the current leader and detect stale metadata. This KIP proposes a similar extension to the Produce API. We will add the leader epoch to the produce request and the partition leader will verify that it matches its own epoch before appending to the local log.

In addition to improving This change improves the debuggability of the producer and making makes its behavior consistent with the consumer ( in terms of its metadata usage), there is a definite benefit to ensuring metadata consistency between the producer and the leader as well. Basically it reduces the likelihood of writes being accepted by a stale leader.

In Kafka, partition leaders will continue to accept writes from producers as long as they believe they are still the leader. It can happen for various reasons that a leader continues to accept writes long after the election of another leader. Our protection from such stale leaders is twofold:

  1. As long as the stale leader is not the only member of the ISR, it will eventually be forced to shrink the ISR after accepting a local write. However, it will be unable to do so because it does not have the latest zkVersion for the leader/ISR state in Zookeeper. Hence writes with acks=all are doomed to fail on stale leaders.
  2. If the stale leader is the only member of the ISR, then as long as unclean leader election is not enabled, no other replica can become the leader. Only partition leaders are allowed to expand the ISR. Hence no messages which are committed by the stale leader can be lost.

The problem in case 1 is that writes with acks=0 and acks=1 will continue to be accepted and written to the log even after the leader fails to shrink the ISR. Eventually when the leader receives the correct metadata, it will truncate these messages. The problem in case 2 is that unclean leader election may be enabled which will cause the messages where were "committed" locally to also be truncated.

Verifying the leader epoch does not fix these problems because the producer may be similarly stuck with stale metadata, but it reduces the likelihood that this will be a prolonged state. Once the producer has updated metadata, writes on the stale leader will no longer be possible.. There is no substantial benefit in terms of the achievable semantics, but it does make it less likely that a write will be accepted by a stale leader, which can be problematic when there are consecutive leader elections. Additionally, it is advantageous for the producer to know whether its metadata is ahead or behind the leader in order to avoid unnecessary metadata refreshing. 

Public Interfaces

We will bump the version of the Produce request API to include the leader epoch. For old versions, we will skip the epoch validation and retain the current behavior.

Code Block
linenumberstrue
ProduceRequest => TransactionalId RequiredAcks Timeout [TopicData]
 TransactionalId => NULLABLE_STRING
 RequiredAcks => INT16
 Timeout => INT32
 TopicData =>  [TopicName [PartitionData]]
   PartitionData => Partition CurrentLeaderEpoch MessageSetSize MessageSet]
     Partition => INT32
     CurrentLeaderEpoch => INT32 // New
     MessageSetSize => INT32
     MessageSet => BYTES

The Produce response schema will match the previous version.

As in KIP-320, we will use the error codes FENCED_LEADER_EPOCH and UNKNOWN_LEADER_EPOCH when the leader epoch does not match.

...