Table of Contents |
---|
Status
Current state: Under Discussion
Discussion thread: TBD here
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Background
Let's clarify the terminology we use in the rest of this document to ensure we are all on the same page.
Definitions
Here is how we define the various delivery semantics.
At-most once: Acknowledged messages would appear in the log at most once. In other words some acknowledged messages may be lost.
At-least once: Every acknowledged message appears in the log at least once, ie. there may be duplicates introduced due to internal retries.
Exactly-once: Every acknowledged message appears in the log exactly once, without duplicates.
Failure modes
We distinguish between two kinds of failures:
Transient failures: Any failure from which a host can recover to full functionality after the cause has passed. For instance, power failures, network interrupts, buggy code, etc.
Hard failures: Failures from which there is no way for a host to recover. For instance, disk failure, data corruption due to a bug, etc.
The guarantees defined above only apply to transient failures. If data is lost or the log becomes un-readable, acknowledged messages may be lost regardless of the originally promised semantics.
Concurrent transient failures may also result in the violation of at-least once and exactly-once delivery semantics.
Replication and delivery guarantees
By he definitions above, if a topic is configured to have only a single replica, the best guarantee that Kafka can provide is at-most once delivery, regardless of the producer settings.
The core reason is that the flush to disk happens asynchronously. So it may happen that there is a power failure --or the kafka process receives a SIGKILL--
after acknowledging the message but before the flush happens. Thus acknowledged messages may be lost even due to a transient failure.
For the same reason, a concurrent transient failure may also result in acknowledged messages being lost. For instance, for a topic with replication-factor=3, if all three replicas suffer a simultaneous power outage after acknowledging the message but before flushing the data to disk, we would lose the acknowledged messages.
Public Interfaces
config | current default value | proposed default value |
---|---|---|
enable.idempotence | false | true |
acks | 1 | all |
max.in.flight.requests.per.connection | 5 | 2 |
retries | 0 | MAX_INT |
There will be no other publicly visible changes required.
The changes above would guarantee exactly once, in order delivery per partition for topics with replication-factor >= 2
, and assuming that the system doesn't suffer multiple hard failures or concurrent transient failures.
Proposed
...
Changes
Code changes
Currently, the idempotent producer requires max.in.flight.requests.per.connection=1 for correctness reasons. We will need to make client and broker changes to support max.in.flight.requests.per.connection > 1 with the idempotent producer. The details of the changes required are in this ticket.
- Kafka Exactly Once - Solving the problem of spurious OutOfOrderSequence errors
- Kafka Exactly Once - Dealing with older message formats when idempotence is enabled
Performance considerations
Next we will delineate the reasons for choosing the proposed values for the configurations in question and the performance impact of the same. A summary of the performance tests we ran to understand the impact of these changes can be found here: An analysis of the impact of max.in.flight.requests.per.connection and acks on Producer performance.
For the same value of max.in.flight.requests, and acks, we have observed that enabling idempotence alone does not significantly impact performance. Detailed results supporting this statement are available here.
Why change max.in.flight.requests.per.connection from 5 to 2?
...
Why change retries from 0 to MAX_INT?
The retries config was defaulted to 0 to ensure that internal producer retries don't introduce duplicates. With the idempotent producer introduced in KIP-98 - Exactly Once Delivery and Transactional Messaging, internal producer retries can no longer introduce duplicates. Hence, with the current proposal to enable the idempotent producer by default, we should let the producer retry as much as possible since there is no correctness penalty for doing so.
Why change acks from 1 to all?
With acks=1, we only have an at most once delivery guarantee. In particular, with acks=1, the broker could crash after acknowledging a message but before replicating it. This results in an acknowledged message being lost. In other words, if we want exactly once delivery, we need acks=all along with enable.idempotence=true.
...
Regardless, we believe strong durability guarantees out of the box are worth the cost of increased latency.
Compatibility, Deprecation, and Migration Plan
This KIP only proposes changes to the default settings of some client configurations. Hence no migration plans are required.
...
As for compatibility, it is possible that the change in defaults might cause unexpected changes in behavior for users who upgrade from older versions. We will call this out the release notes for the 1.0.0 release. Some of the differences are listed below.
Applications may receive the new OutOfOrderSequenceException
This exception indicates that previously acknowledged data was lost. With the new exactly once features we can detect this and report the error to the user. A properly configured producer and broker should only receive this exception for real data loss.
Producer performance profile may change
The acks=all setting would result worse througput and latency, as documented here.
Rejected Alternatives
This is a proposal to change the default semantics of the Kafka Producer. The specific values of the various variables are either necessary to achieve stronger semantics, or have been chosen through empirical observation. Hence there are no rejected alternatives at this point.