Status
Current state: "Under Discussion"
Discussion thread:
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In Kafka, Producers have the option of either waiting for the message to be committed or not, depending on their preference for tradeoff between latency and durability. This preference is controlled by the acks setting that the producer uses.
Currently, the acks config accepts these values:
acks=0
- If set to zero then the producer will not wait for any acknowledgment from the server at all.
acks=1
- This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers.
acks=all
- This means the leader will wait for the full set of in-sync replicas to acknowledge the record.
For acks=all
case, the simple flow is like this:
It looks perfect. But there's a caveat here. Like the doc said, acks=all will "wait for the full set of in-sync replicas to acknowledge the record", so if there's only 1 replica in in-sync replicas, it will have the same effect as acks=1 (even though we have replication-factor set to 3).
To get the expected durability, users also need to set min.insync.replicas
(default is 1) config correctly. In the doc of min.insync.replicas config:
When min.insync.replicas
and acks
used together, it allows you to enforce greater durability guarantees. A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with acks of "all". This will ensure that the producer raises an exception if a majority of replicas do not receive a write.
In the typical scenario: replication factor = 3, min.insync.replicas=2, acks=all, we ensure at least 2 replicas acks, which means, we can tolerate 1 replica out-of-sync or down (ex: rolling update) and still make the write process successfully. This keeps high availability and durability in Kafka.
Let's step back and think again. In the case of replication factor = 3 and all 3 replicas are in-sync replica, if we can achieve high durability with 2 replicas ack, then, could we increase write throughput by just need 2 replicas ack, not 3 of them?
This is what we have now for acks=all
, we need to wait for the slowest follower acks (i.e. 500ms) before we can respond to the producer.
I'd like to propose a new config: acks=min.insync.replicas
(i.e. the number of replica acks is decided by the min.insync.replicas config in topic/broker) , we just need to wait for the fastest follower acks (i.e. 100ms) then respond to the producer.
This way, when finding a balance for tradeoff between latency and durability, we can have both!
Furthermore, with this config, we can have fine-grained acks setting for each topic. For example,
Topic A, we can tolerate data lose, but require high throughput, we set min.insync.replicas=1
Topic B, we don't want data lose, we set min.insync.replicas=2
So, with current acks=all config, the producer will wait for all 3 replicas acks (suppose in-sync replicas are 3) for both topics A and B. If we want to achieve different durability level, we need 2 producers to handle 2 topics.
But with the proposed acks=min.insync.replicas, we can achieve that with 1 producer, and define different durability level in topic min.insync.replicas
config.
Public Interfaces
A new acks value will be accepted:
acks=0
- If set to zero then the producer will not wait for any acknowledgment from the server at all.
acks=1
- This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers.
acks=all(-1)
- This means the leader will wait for the full set of in-sync replicas to acknowledge the record.
acks=min.insync.replicas(-2)
- This means the leader will wait for the min.insync.replicas number of replicas to acknowledge the record. The min.insync.replicas config can be defined in broker or topic (if overridden in topic level)
Proposed Changes
When the producer sets acks=min.insync.replicas
and send a record to the broker, the broker will do the same thing as acks=all
did, firstly check if current number of in-sync replicas is greater or equal to min.insync.replicas value, if not, throw NotEnoughReplicasException. If yes, we wait for the number of min.insync.replicas acknowledge, and then respond to the producers.
For the idempotent/transactional producer, currently, the producer requires acks
must be 'all'. But as mentioned above, acks=all doesn't guarantee high durability if the in-sync replicas only have 1. So, we can actually complete an idempotent/transactional send with only 1 replica up and all other replicas down, if min.insync.replicas (default is 1) is not set. This said, the number of acks (1 or 2 or 3 or ...) is not a key requirement for idempotent/transactional producer. Allowing acks=min.insync.replicas producer to send idempotent/transactional data is acceptable. In this KIP, I'll allow idempotent/transactional producer to set acks=all or acks=min.insync.replicas.
Compatibility, Deprecation, and Migration Plan
This KIP introduces a new acks value, it won't have backward compatibility issue for any existing applications.
If producer is upgraded and set to acks=min.insync.replicas(-2), and server is still staying in the old version, the server will respond InvalidRequiredAcksException to the producer.
If producer is in the old version, depending on the client validation, the acks=min.insync.replicas(-2) might work not might not work. In Java producer, it will throw ConfigException due to invalid value (-2).
Test Plan
Unit test + Integration test
Rejected Alternatives
- Allowing acks set to a random value.
→ This is rejected because the random value might break the broker min.insync.replicas contract. For example, if broker set min.insync.replicas to 2, and acks=3, this will let write failure when in-sync replicas only have 2, while it reaches min.insync.replicas requirement. So I think allowing acks=min.insync.replicas makes more sense.