You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state: Under Discussion

Discussion thread: TBD

JIRA: KAFKA-2334

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

ListOffsetsRequest returns the latest offset for each partition.  These offsets should advance monotonically as new messages are added to the partition.  However, sometimes, right after a partition leadership change, the message offsets returned by ListOffsetsRequest can actually go backwards.

Because it happens very rarely, applications that use Kafka are usually not prepared for non-monotonic offset behavior.  Applications, such as connectors for Spark Streaming, may crash or misbehave.  To avoid these issues, we should fix this corner case so that offsets advance monotonically even after a leader election.

Proposed Changes

After a successful partition leadership election, a former follower is now the leader.  However, the high water mark on the former follower may be behind the high water mark on the old, failed leader.  This is the cause of the non-monotonic behavior immediately after the election.

What we would like to do is wait until the new leader's high water mark has caught up with the messages already in its log.  To implement this, during the transition from follower to leader, the broker can record the current logEnd for the partition.  Then, it can refuse to answer ListOffsetsRequest for that partition until the high water mark has caught up with this value.

The period when the offset is unavailable should be brief.  During this period, the broker should simply return a retriable exception when it is asked for the offset of the partition.  For current versions of ListOffsetsRequest, this exception can be NotLeaderForPartitionException.  For new versions of ListOffsetsRequest, we can return a new, more precise exception.  The main advantage of creating a new exception is that the client knows it can avoid re-fetching metadata.  A second advantage is that the more precise error message may help with debugging on the client side.

Public Interfaces

There will be a new version of ListOffsetsResponse API.  This will be the same as the existing one, except that we can return a new exception, OffsetNotAvailableForPartitionException, for a partition.  This new exception will be a subclass of RetriableException.

Rejected Alternatives

Rather than returning a retriable exception, the broker could simply put the ListOffsetsRequest into a purgatory structure until the offset was available.  This avoids the need for the client to poll the server.  We would create a new version of the ListOffsetsRequest RPC which adds a maximum timeout field.

However, adding a new purgatory structure would increase the complexity of the code substantially.  Since this case is a corner case which only happens for a few seconds after a leader election, the extra performance does not seem worth it.  It is also awkward to put ListOffsetsRequest into a purgatory, because each request could ask about multiple partitions.  The results for some partitions might be blocked because other partitions were not ready to return their offsets.  Finally, because an RPC revisions is needed, this approach would solve the problem for new clients, but not for older ones.  The approach above avoids these problems: it avoids adding a new purgatory structure, allows us to give back results immediately for the partitions where we know those results, and solves the problem for older clients as well as newer ones.

  • No labels