Status
Current state: Under Discussion
Discussion thread: TBD
JIRA: https://issues.apache.org/jira/browse/KAFKA-5793 and https://issues.apache.org/jira/browse/KAFKA-5794
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
There are currently two situations where the behavior of the producer with idempotence enabled is less than satisfactory:
- Currently the OutOfOrderSequence exception may be raised spuriously, for instance, if the producer state was removed on the server due to segments which are older than the retention limit being deleted. We would like the OutOfOrderSequence exception to unequivocally indicate data loss, and hence need to detect and handle these false positives.
- There is no graceful way to handle enabling idempotence on the producer, and yet having some topics being on an older (pre 0.11.0) message format. This means that making idempotence the default is impossible, as the upgrade steps would simply not work. Hence we would like to introduce a 'safe' mode for idempotence where it will only be enabled if the underlying topic has the requisite message format.
Background
The two problems described above are detailed in the following pages, along with proposed solutions.
- Kafka Exactly Once - Solving the problem of spurious OutOfOrderSequence errors
- Kafka Exactly Once - Dealing with older message formats when idempotence is enabled
Public Interfaces
KafkaProducer
With the changes in Kafka Exactly Once - Solving the problem of spurious OutOfOrderSequence errors, we will raise a new error code from `Producer.send()`, viz. the DuplicateSequenceException
to indicate that the the last attempt to send the batch failed because a previous attempt actually succeeded. This will only happen if clients enable idempotence and have more than 5 in flight requests.
public interface Producer<K,V> extends Closeable { /** * The Future may return a DuplicateSequenceException if client tried to * resend the message even though a previous attempt actually succeeded. * This may happen if the user configures a very high number of inflights * (greater than 5), and if there are connection issues between the client * and broker. */ public Future<RecordMetadata> send(ProducerRecord<K,V> record) }
TopicMetadata
We add a 'MessageFormatVersion' field to the TopicMetadata
returned in the MetadataResponse
. This is used to selectively enable idempotence in requested
mode when the partition actually supports it. See Kafka Exactly Once - Dealing with older message formats when idempotence is enabled for a description of precisely how this will be used.
// TopicMetadataV3 TopicMetadata => TopicErrorCode Topic IsInternal MessageFormatVersion [PartitionMetadata] TopicErrorCode => int16 Topic => String IsInternal => Boolean MessageFormatVersion => int8 PartitionMetadata => PartitionMetadataV2
ProduceResponse
We add a logStartOffset
field to the produce response to help the producer identify when producer state has been lost due to retention time elapsing. See Kafka Exactly Once - Solving the problem of spurious OutOfOrderSequence errors for a precise description of how this will be used.
// ProduceResponse v4 ProduceResponse => [TopicName [Partition ErrorCode Offset Timestamp logStartOffset]] ThrottleTime TopicName => string Partition => int32 ErrorCode => int16 Offset => int64 Timestamp => int64 ThrottleTime => int32 logStartOffset => int64
Producer config changes
We introduce new values for the enable.idempotence
configuration: requested
, required
, off.
Compatibility, Deprecation, and Migration Plan
For the Produce Request/Response updates, we follow the existing conventions for maintaining backward compatibility. New producers will continue to talk with old brokers using the old versions of the protocol.
The legacy values for `enable.idempotence` will be interpreted as follows by the new producer: true
will mean required
, false
will mean off.
As part of these changes, we will deprecate the true
and false
options for enable.idempotence
by logging a warning if these are used.
Rejected Alternatives
This KIP contains changes to fix existing problems or clarify existing behavior. As such, there are not too many options for making these improvements within the existing solutions.