Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-6880

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP aims to solve two related problems in the handling of fetch requests.

Consumer handling of unclean truncation: When unclean leader election is enabled, we may lose committed data. A consumer which is reading from the end of the log will typically see an out of range error, which will cause it to use its auto.offset.reset policy. To avoid losing data, users should use the "earliest" option, but that means consuming the log from the beginning.

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7395
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7440
,  
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7747

Release: The broker side changes which improved fencing were released in 2.1.0. Client-side truncation detection and reset capability was released in 2.3.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP aims to solve two related problems in the handling of fetch requests.

Consumer handling of unclean truncation: When unclean leader election is enabled, we may lose committed data. A consumer which is reading from the end of the log will typically see an out of range error, which will cause it to use its auto.offset.reset policy. To avoid losing data, users should use the "earliest" option, but that means consuming the log from the beginning.

It is also possible that prior to sending the next fetch, new data is written to the log so that the consumer's fetch offset becomes valid again. In this case, the consumer will just miss whatever data had It is also possible that prior to sending the next fetch, new data is written to the log so that the consumer's fetch offset becomes valid again. In this case, the consumer will just miss whatever data had been written between the truncation point and its fetch offset.

...

This KIP adds the leader epoch to the Fetch request. When a broker receives a fetch request, it will compare the requested epoch with its own. The fetch will only be permitted if the requested epoch matches the leader's epoch. If the requested epoch is older than the leader's, we will use a new FENCED_LEADER_EPOCH error code. If the epoch is newer than the leader's (for example in a KIP-232 scenario), we will use a new UNKNOWN_LEADER_EPOCH error code.

...

The proposal in this KIP is to have the consumer behave more like a follower. When a leader is changed, the consumer will first The consumer will obtain the current leader epoch using the Metadata API. When fetching from a new leader, the consumer will first check for truncation using the OffsetForLeaderEpoch API. In order to enable this, we need to keep track of the last epoch that was consumed. If we do not have one (e.g. because the user has seeked to a particular offset or because the message format is older), then the consumer will skip this step. To support this tracking, we will extend the OffsetCommit API to include the leader epoch if one is available.

Leader changes are detected either through a metadata refresh or in response to a FENCED_LEADER_EPOCH error. It is also possible that the consumer sees an UNKNOWN_LEADER_EPOCH in a fetch response if its metadata has gotten ahead of the leader. 

This change in behavior has implications for the consumer's offset reset policy, which defines This change in behavior has implications for the consumer's offset reset policy, which defines what the consumer should do if its fetch offset becomes out of range. With this KIP, the only case in which this is possible other than an out of range seek is if the consumer fetches from an offset earlier than the log start offset. By opting into an offset reset policy, the user allows for automatic adjustments to the fetch position, so we take advantage of this to to reset the offset as precisely as possible when log truncation is detected. In some pathological cases (e.g. multiple consecutive unclean leader elections), we may not be able to find the exact offset, but we should be able to get close by finding the starting offset of the next largest epoch that the leader is aware of. We propose in this KIP to change the behavior for both the "earliest" and "latest" reset modes to do this automatically as long as the message format supports lookup by leader epoch.   The consumer will log a message to indicate that the truncation was detected, but will reset the position automatically. 

If a user is not using an auto reset option, we will raise a LogTruncationException from poll() when log truncation is detected. This gives users the ability to reset state if needed or revert changes to downstream systems. The exception will include the partitions that were truncated and the offset of divergence as found above. This gives applications the ability to execute any logic to revert changes if needed and rewind the fetch offset. Users must handle this exception and reset the position of the consumer if no auto reset policy is enabled.

For consumers, we propose some additional extensions:

  1. To fix the problem with KIP-232When the consumer needs to reset offsets, it uses the ListOffsets API to query the leader. To avoid querying stale leaders, we will add the leader epoch the ListOffsets response. The consumer will use this in its first fetch request after resetting offsets. fencing. Additionally, we will modify this API to return the corresponding epoch for any offsets looked up.
  2. We will also provide the leader epoch in the offset commit and fetch APIs. This allows consumer groups to detect truncation across rebalances or restarts.For users  Note that in cases like that found in KIP-232, it is possible for the leader epoch included in the committed offset to be ahead of the metadata that is known to the consumer. Consumers are expected to wait until the metadata has at least reached the epoch of the committed offset before checking for truncation.
  3. For users that store offsets in an external system, store offsets in an external system, we will provide APIs which expose the leader epoch of each record and we will provide an alternative seek API so that users can initialize the offset and leader epoch.

Finally, we want to improve fencing for clients as well to resolve metadata inconsistencies such as in KIP-232. To support this, we will expose the leader epoch in the Metadata API. The consumer will use the current epoch when checking for truncation and when fencing.

Replica Handling

Replica Handling

This KIP adds replica fencing for both fetching and truncation. When a replica becomes a follower, it will attempt to find the truncation offset using the OffsetsForLeaderEpoch API. The new epoch will be included in this request so that the follower is ensured that it is truncating using the log from the leader in the correct epoch. Without this validation, it is possible to truncate using the log of a leader with stale log state, which can lead to log divergence.

...

We will introduce a new exception type, which will be raised from KafkaConsumer.poll(Duration) as described above. This exception extends from `OffsetOutOfRangeException` for compatibility. If a user's offset reset policy is set to "none," they will still be able to catch `OffsetOutOfRangeException`. For new usage, users can catch the more specific exception type and use the new `seekToNearest`seek()` API defined below.

Code Block
class LogTruncationException extends OffsetOutOfRangeException {}

We will also add a new retriable error code for the UNKNOWN_LEADER_EPOCH error code:

Code Block
class UnknownLeaderEpochException extends RetriableException {}

This will be located in the public `errors` package, but the consumer will internally retry when it receives this error.

The leader epoch will be exposed in the ConsumerRecord and OffsetAndMetadata objects.

Code Block
class ConsumerRecord<K, V> {
	int leaderEpoch();
}

class OffsetAndMetadata {
	int leaderEpoch();
}

We will also have a new API to support seeking to an offset and leader epoch. This is required in order to support storage of offsets in an external store. When the consumer is initialized, the user will call seekToCommitted with the offset and leader epoch that was stored. The consumer will initialize the position of the consumer using this API. If the log has been truncated since the time the offsets were stored, the next call to poll() will raise a LogTruncationException as described above.

Code Block
/**
 * Seek to a committed offset taking into account leader epoch information.
 */ 
void seekToCommitted(TopicPartition partition, OffsetAndMetadata offset);

...

Fetch

We will bump the fetch request version in order to include the last fetched leader epoch. This will only be provided by the fetcher for consecutive fetches. If the consumer seeks to the middle of the log, for example, then we will use the sentinel value -1 and the leader will skip the epoch validation. The replica should always have the previous epoch except when beginning from the beginning of the log.

The new schema is given below:

Code Block
linenumberstrue
FetchRequest => MaxWaitTime ReplicaId MinBytes IsolationLevel FetchSessionId FetchSessionEpoch [Topics] [RemovedTopics]
  MaxWaitTime => INT32
  ReplicaId => INT32
  MinBytes => INT32
  IsolationLevel => INT8
  FetchSessionId => INT32
  FetchSessionEpoch => INT32
  Topics => TopicName Partitions
  	TopicName => STRING
  	Partitions => [Partition FetchOffset StartOffset LeaderEpoch MaxBytes]
  		Partition => INT32
  		FetchOffset => INT64
  		StartOffset => INT64
		LeaderEpoch => INT32   // New
  		MaxBytes => INT32
  RemovedTopics => RemovedTopicName [RemovedPartition]
  	RemovedTopicName => STRING
  	RemovedPartition => INT32

The response schema will not change, but we will have two new error codes as mentioned above

  1. FENCED_LEADER_EPOCH: The replica has a lower epoch than the leader. This is retriable.
  2. UNKNOWN_LEADER_EPOCH: This is a retriable error code which indicates that the epoch in the fetch request was larger than any known by the broker.

OffsetsForLeaderEpoch

The changes to the OffsetsForLeaderEpoch request API are similar.

Code Block
linenumberstrue
OffsetForLeaderEpochRequest => [Topic]
  Topic => TopicName [Partition] 
    TopicName => STRING
    Partition => PartitionId CurrentLeaderEpoch LeaderEpoch
      PartitionId => INT32
      CurrentLeaderEpoch => INT32 // New
      LeaderEpoch => INT32

If the current leader epoch does not match that of the leader, then we will send either FENCED_REPLICA or UNKNOWN_LEADER_EPOCH as we do for the Fetch API.

Note that consumers will send a sentinel value (-1) for the current epoch and the broker will simply disregard that validation. 

Metadata

The metadata response will be extended to include the leader epoch. This enables stronger fencing for consumers. We can enable similar protection in producers in the future, but that is out of the scope of this KIP.

Code Block
MetadataResponse => Brokers ClusterId ControllerId [TopicMetadata]
  Brokers => [MetadataBroker]
  ClusterId => NULLABLE_STRING
  ControllerId => INT32
  
TopicMetadata => ErrorCode TopicName IsInternal [PartitionMetadata]
  ErrorCode => INT16
  TopicName => STRING
  IsInternal => BOOLEAN
  
PartitionMetadata => ErrorCode PartitionId Leader LeaderEpoch Replicas ISR OfflineReplicas
  ErrorCode => INT16
  PartitionId => INT32
  Leader => INT32
  LeaderEpoch => INT32 // New
  Replicas => [INT32]
  ISR => [INT32]
  OfflineReplicas => [INT32]

There are no changes to the Metadata request schema.

OffsetCommit

The new OffsetCommit request schema is provided below. The response schema matches the previous version. We have added fields for the leader epoch and the timestamp.

Code Block
linenumberstrue
OffsetCommitRequest => GroupId Generation MemberId [TopicName [Partition Offset Timestamp LeaderEpoch Metadata]]
  GroupId => STRING
  Generation => INT32
  MemberId => STRING
  RetentionTime => INT64
  TopicName => STRING
  Partition => INT32
  Offset => INT64
  LeaderEpoch => INT32  // New
  Metadata => STRING

OffsetFetch

The OffsetFetch response schema will be similarly modified. The request schema will remain the same.

Code Block
linenumberstrue
OffsetFetchResponse => [TopicName [Partition Offset Timestamp LeaderEpoch Metadata ErrorCode]]
  TopicName => STRING
  Partition => INT32
  Offset => INT64
  LeaderEpoch => INT32  // New
  Metadata => STRING
  ErrorCode => INT16

...

Finally, we also modify the ListOffsets response schema.

...

linenumberstrue

...

to resume consumption. This exception is raised by the consumer after using the OffsetForLeaderEpoch API to find the offset of divergence. This offset is included as a field in the exception. Typically users handling this exception will seek to this offset.

Code Block
/**
 * In the even of unclean leader election, the log will be truncated,
 * previously committed data will be lost, and new data will be written
 * over these offsets. When this happens, the consumer will detect the 
 * truncation and raise this exception (if no automatic reset policy 
 * has been defined) with the first offset to diverge from what the 
 * consumer read.
 */ 
class LogTruncationException extends OffsetOutOfRangeException {
  /**
   * Get the truncation offsets for the partitions which were truncated. 
   * This is the first offset which is known to diverge from what the consumer read.
   */	
  Map<TopicPartition, OffsetAndMetadata> truncationOffsets();
}

We will also add new retriable exceptions for the UNKNOWN_LEADER_EPOCH and FENCED_LEADER_EPOCH error codes:

Code Block
class UnknownLeaderEpochException extends RetriableException {}

class FencedLeaderEpochException extends InvalidMetadataException {}

This will be located in the public `errors` package, but the consumer will internally retry when it receives this error.

The leader epoch will be exposed in the ConsumerRecord and OffsetAndMetadata objects.

Code Block
class ConsumerRecord<K, V> {
  /** 
   * Get the leader epoch or empty if it is unknown. 
   */
  Optional<Integer> leaderEpoch();
}

class OffsetAndMetadata {
  /**
   * New constructor including optional leader epoch. Old constructors
   * will still be supported and will use Optional.empty() as the default
   * leader epoch.
   */
  OffsetAndMetadata(long offset, String metadata, Optional<Integer> leaderEpoch); 

  /** 
   * Get the leader epoch of the previously consumed record (if one is known).
   * Log truncation is detected if there exists a leader epoch which is larger
   * than this epoch and begins at an offset earlier than the committed offset.
   */
  Optional<Integer> leaderEpoch();
}

We will also have a new API to support seeking to an offset and leader epoch. This is required in order to support storage of offsets in an external store. When the consumer is initialized, the user will call seek() with the offset and leader epoch that was stored. The consumer will initialize the position of the consumer using this API.

Like the other seek() overload, this method does not make any remote calls. If a leader epoch has been provided, then in the next call to poll(), the consumer will use the OffsetForLeaderEpoch API to check for truncation. If the log has been truncated since the time the offsets were stored, the next call to poll() will raise a LogTruncationException as described above.

Code Block
/**
 * Seek to an offset and initialize the leader epoch (if present).
 */ 
void seek(TopicPartition partition, OffsetAndMetadata offset);

To make the external storage use case simpler, we will provide a helper to ConsumerRecords to get the next offsets. This simplifies commit logic for consumers which only commit offsets after consuming full batches.

Code Block
class ConsumerRecords<K, V> {
  /**
   * Get the next offsets that the consumer will consumer.
   * This can be passed directly to a commitSync(), for example,
   * after the full batch has been consumed. For finer-grained,
   * offset tracking, you should use the offset information from
   * the individual ConsumerRecord instances.
   */  
  Map<TopicPartition, OffsetAndMetadata> nextOffsets();
}


Finally, we may as well protect the offsets found through the offsetsForTimes() API. An unclean leader election may invalidate the results of a lookup by time, so it would be unsafe to use seek() without considering the epoch information for the returned offsets. To support this, we will add the leader epoch to the OffsetAndTimestamp object which is returned from offsetsForTimes().

Code Block
class OffsetAndTimestamp {
  /**
   * Get the leader epoch corresponding to the offset that was 
   * found (if one exists). This can be provided to seek() to 
   * ensure that the log hasn't been truncated prior to fetching.
   */
  Optional<Integer> leaderEpoch();
}

Protocol Changes

Fetch

We will bump the fetch request version in order to include the current leader epoch. For older versions, we will skip epoch validation as before.

The new schema is given below:

Code Block
linenumberstrue
FetchRequest => MaxWaitTime ReplicaId MinBytes IsolationLevel FetchSessionId FetchSessionEpoch [Topics] [RemovedTopics]
  MaxWaitTime => INT32
  ReplicaId => INT32
  MinBytes => INT32
  IsolationLevel => INT8
  FetchSessionId => INT32
  FetchSessionEpoch => INT32
  Topics => TopicName Partitions
  	TopicName => STRING
  	Partitions => [Partition FetchOffset StartOffset LeaderEpoch MaxBytes]
  		Partition => INT32
		CurrentLeaderEpoch => INT32   // New
  		FetchOffset => INT64
  		StartOffset => INT64
  		MaxBytes => INT32
  RemovedTopics => RemovedTopicName [RemovedPartition]
  	RemovedTopicName => STRING
  	RemovedPartition => INT32

The response schema will not change, but we will have two new error codes as mentioned above:

  1. FENCED_LEADER_EPOCH: The replica has a lower epoch than the leader. This is retriable.
  2. UNKNOWN_LEADER_EPOCH: This is a retriable error code which indicates that the epoch in the fetch request was larger than any known by the broker.

Previously, we would return the NOT_LEADER_FOR_PARTITION error code if a follower receives an unexpected Fetch request. In this case, the requested epoch is different from what the follower has, so we can now return one of the error codes above, which contain more specific information.

OffsetsForLeaderEpoch

The changes to the OffsetsForLeaderEpoch request API are similar.

Code Block
linenumberstrue
OffsetsForLeaderEpochRequest => [Topic]
  Topic => TopicName [Partition] 
    TopicName => STRING
    Partition => PartitionId CurrentLeaderEpoch LeaderEpoch
      PartitionId => INT32
      CurrentLeaderEpoch => INT32 // New
      LeaderEpoch => INT32

If the current leader epoch does not match that of the leader, then we will send either FENCED_REPLICA or UNKNOWN_LEADER_EPOCH as we do for the Fetch API.

Since this API is no longer exclusively an inter-broker API, we will also add the throttle time to the response schema.

Code Block
linenumberstrue
OffsetsForLeaderEpochResponse => ThrottleTimeMs [TopicMetadata]
  ThrottleTimeMs => INT32 // New
  TopicMetadata => TopicName PartitionMetadata
    TopicName => STRING
    PartitionMetadata => [ErrorCode PartitionId LeaderEpoch EndOffset]
      ErrorCode => INT16
      PartitionId => INT32
      LeaderEpoch => INT32
      EndOffset => INT64


Metadata

The metadata response will be extended to include the leader epoch. This enables stronger fencing for consumers. We can enable similar protection in producers in the future, but that is out of the scope of this KIP.

Code Block
linenumberstrue
MetadataResponse => ThrottleTimeMs Brokers ClusterId ControllerId [TopicMetadata]
  ThrottleTimeMs => INT32
  Brokers => [MetadataBroker]
  ClusterId => NULLABLE_STRING
  ControllerId => INT32
  
TopicMetadata => ErrorCode TopicName IsInternal [PartitionMetadata]
  ErrorCode => INT16
  TopicName => STRING
  IsInternal => BOOLEAN
  
PartitionMetadata => ErrorCode PartitionId Leader LeaderEpoch Replicas ISR OfflineReplicas
  ErrorCode => INT16
  PartitionId => INT32
  Leader => INT32
  LeaderEpoch => INT32 // New
  Replicas => [INT32]
  ISR => [INT32]
  OfflineReplicas => [INT32]

There are no changes to the Metadata request schema.

OffsetCommit

The new OffsetCommit request schema is provided below. A field for the leader epoch has been added. Note that this is the epoch of the previously fetched record. The response schema matches the previous version.

Code Block
linenumberstrue
OffsetCommitRequest => GroupId Generation MemberId [TopicName [Partition Offset LeaderEpoch Metadata]]
  GroupId => STRING
  Generation => INT32
  MemberId => STRING
  RetentionTime => INT64
  TopicName => STRING
  Partition => INT32
  Offset => INT64
  LeaderEpoch => INT32  // New
  Metadata => STRING

TxnOffsetCommit

Similarly, we need to add the leader epoch to the TxnOffsetCommit API, which is used by transactional producers.

Code Block
linenumberstrue
TxnOffsetCommitRequest => TransactionalId GroupId ProducerId ProducerEpoch Topics
  GroupId => STRING
  Generation => INT32
  MemberId => STRING
  RetentionTime => INT64
  Topics => [TopicName [Partition Offset LeaderEpoch Metadata]]  
    TopicName => STRING
    Partition => INT32
    Offset => INT64
    LeaderEpoch => INT32  // New
    Metadata => STRING


OffsetFetch

The OffsetFetch response schema will be similarly modified. The request schema will remain the same.

Code Block
linenumberstrue
OffsetFetchResponse => ThrottleTimeMs Topics ErrorCode
  ThrottleTimeMs => INT64
  Topics => [TopicName [Partition Offset LeaderEpoch Metadata ErrorCode]]
  	TopicName => STRING
  	Partition => INT32
  	Offset => INT64
  	LeaderEpoch => INT32  // New
  	Metadata => STRING
    ErrorCode => INT16
  ErrorCode => INT16

ListOffsets

We need two changes to the ListOffsets API. First, we add the current leader epoch to the request schema in order to protect clients from querying stale leaders.

Code Block
linenumberstrue
ListOffsetRequest => ReplicaId [TopicName [Partition CurrentLeaderEpoch Timestamp]]
 ReplicaId => INT32
 TopicName => STRING
 Partition => INT32
 CurrentLeaderEpoch => INT32 // New
 Timestamp => INT64

Second, we add the leader epoch to the response that corresponds with the returned offset. The client can use this to reconcile the log prior to fetching from a new leader.

Code Block
linenumberstrue
ListOffsetResponse => ThrottleTimeMs Topics
  ThrottleTimeMs => INT64
  Topics => [TopicName [Partition ErrorCode Timestamp Offset LeaderEpoch]]
    TopicName => STRING
    Partition => INT32
    ErrorCode => INT16
    Timestamp => INT64    
    Offset => INT64
    LeaderEpoch => INT32  // New

As with the Fetch and OffsetForLeaderEpoch APIs, the response will support the FENCED_LEADER_EPOCH and UNKNOWN_LEADER_EPOCH error codes.

Offset Schema Changes

This KIP introduces a new schema version for committed offsets in the internal __consumer_offsets topic. This is not a public API, but we mention it for compatibility implications. The version will be bumped to 3. The new schema is given below and corresponds with the changes to the OffsetCommit APIs:

Code Block
Offset Commit Value Schema (Version: 3) =>
  Offset => INT64
  LeaderEpoch => INT32 // New
  Metadata => STRING
  CommitTimestamp => INT64

...