Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: "Under Discussionpending"

Discussion thread:

JIRA:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Note: This KIP required leader election change, which will be proposed in another KIP.


In Kafka, Producers have the option of either waiting for the message to be committed or not, depending on their preference for tradeoff between latency and durability. This preference is controlled by the acks setting that the producer uses.

...

Let's step back and think again. In the case of replication factor = 3, min.insync.replicas=2, acks=all, we can allow 1 replica doesn't ack, and still have durability guarantee


In the case of replication factor = 3 and all 3 replicas are in-sync replica, if we can achieve high durability with 2 replicas ack, then, could we increase write throughput by just need 2 replicas ack, not 3 of them?

...

This is what we have now for acks=all , we need to wait for the slowest follower acks (i.e. 500ms) before we can respond to the producer.

Image RemovedImage Added


I'd like to propose a new config: acks=min.insync.replicas (i.e. the number of replica acks is decided by the min.insync.replicas config in topic/broker) , we just need to wait for the fastest follower acks (i.e. 100ms) then respond to the producer. The slower follower will catch up later.

...

This way, when finding a balance for tradeoff between latency and durability, we can have both!


Note that in acks=min.insync.replicas case, the slow follower might be easier to become out of sync than acks=all. Take the same example above, in acks=all, the slower follower just need to fetch 1 batches records (assuming sending 1 batch each time) for the fetch request. But after using acks=min.insync.replicas, it needs to fetch 5x more batches, which might make it become slower. When happened, please either find the cause of the slowness, or use acks=all.


Furthermore, with this config, we can have fine-grained acks setting for each topic. For example,

Topic A, we can tolerate data loseloss, but require high throughput, we set min.insync.replicas=1

Topic B, we don't want data loseloss, we set min.insync.replicas=2

...

  • acks=0
    • If set to zero then the producer will not wait for any acknowledgment from the server at all.
  • acks=1
    • This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers.
  • acks=all(-1)
    • This means the leader will wait for the full set of in-sync replicas to acknowledge the record.
  • acks=min.insync.replicas(-2)
    • This means the leader will wait for the min.insync.replicas number of replicas to acknowledge the record. The min.insync.replicas config can be defined in broker or topic (if overridden in topic level)

Note: The default value for acks  is still all (-1) as before.

Proposed Changes

When the producer sets acks=min.insync.replicas and send a record to the broker, the broker will do the same thing as acks=all did, firstly check if current number of in-sync replicas is greater or equal to min.insync.replicas value, if not, throw NotEnoughReplicasException. If yes, we wait for the number of min.insync.replicas acknowledge, and then respond to the producers.

...

For the idempotent/transactional producer, currently, the producer requires acks must be 'all'. But as mentioned above, acks=all doesn't guarantee high durability if the in-sync replicas only have 1. So, we can actually complete an idempotent/transactional send with only 1 replica up and all other replicas down, if min.insync.replicas (default is 1) is not set. This said, the number of acks (1 or 2 or 3 or ...all) is not a key requirement for idempotent/transactional producer. Allowing acks=min.insync.replicas producer to send idempotent/transactional data is acceptablewon't break any contract. In this KIP, I'll We will allow idempotent/transactional producer to set acks=all or acks=min.insync.replicas.

Compatibility, Deprecation, and Migration Plan

This KIP introduces a new acks value and not default value change, it won't have backward compatibility issue for any existing applications.

...