Status

Current state: "pending"

Discussion thread:

JIRA:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Note: This KIP required leader election change, which will be proposed in another KIP.


In Kafka, Producers have the option of either waiting for the message to be committed or not, depending on their preference for tradeoff between latency and durability. This preference is controlled by the acks setting that the producer uses.

Currently, the acks config accepts these values:

  • acks=0
    • If set to zero then the producer will not wait for any acknowledgment from the server at all.
  • acks=1
    • This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers.
  • acks=all
    • This means the leader will wait for the full set of in-sync replicas to acknowledge the record.


For acks=all case, the simple flow is like this:


It looks perfect. But there's a caveat here. Like the doc said, acks=all will "wait for the full set of in-sync replicas to acknowledge the record", so if there's only 1 replica in in-sync replicas, it will have the same effect as acks=1 (even though we have replication-factor set to 3).

To get the expected durability, users also need to set min.insync.replicas  (default is 1) config correctly. In the doc of min.insync.replicas config:

When min.insync.replicas  and acks  used together, it allows you to enforce greater durability guarantees. A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with acks of "all". This will ensure that the producer raises an exception if a majority of replicas do not receive a write.


In the typical scenario: replication factor = 3, min.insync.replicas=2, acks=all, we ensure at least 2 replicas acks, which means, we can tolerate 1 replica out-of-sync or down (ex: rolling update) and still make the write process successfully. This keeps high availability and durability in Kafka.

Let's step back and think again. In the case of replication factor = 3, min.insync.replicas=2, acks=all, we can allow 1 replica doesn't ack, and still have durability guarantee


In the case of replication factor = 3 and all 3 replicas are in-sync replica, if we can achieve high durability with 2 replicas ack, then, could we increase write throughput by just need 2 replicas ack, not 3 of them?


This is what we have now for acks=all , we need to wait for the slowest follower acks (i.e. 500ms) before we can respond to the producer.


I'd like to propose a new config: acks=min.insync.replicas (i.e. the number of replica acks is decided by the min.insync.replicas config in topic/broker) , we just need to wait for the fastest follower acks (i.e. 100ms) then respond to the producer. The slower follower will catch up later.


This way, when finding a balance for tradeoff between latency and durability, we can have both!


Note that in acks=min.insync.replicas case, the slow follower might be easier to become out of sync than acks=all. Take the same example above, in acks=all, the slower follower just need to fetch 1 batches records (assuming sending 1 batch each time) for the fetch request. But after using acks=min.insync.replicas, it needs to fetch 5x more batches, which might make it become slower. When happened, please either find the cause of the slowness, or use acks=all.


Furthermore, with this config, we can have fine-grained acks setting for each topic. For example,

Topic A, we can tolerate data loss, but require high throughput, we set min.insync.replicas=1

Topic B, we don't want data loss, we set min.insync.replicas=2

So, with current acks=all config, the producer will wait for all 3 replicas acks (suppose in-sync replicas are 3) for both topics A and B. If we want to achieve different durability level, we need 2 producers to handle 2 topics.

But with the proposed acks=min.insync.replicas, we can achieve that with 1 producer, and define different durability level in topic min.insync.replicas  config.

Public Interfaces

A new acks value will be accepted:

  • acks=0
    • If set to zero then the producer will not wait for any acknowledgment from the server at all.
  • acks=1
    • This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers.
  • acks=all(-1)
    • This means the leader will wait for the full set of in-sync replicas to acknowledge the record.
  • acks=min.insync.replicas(-2)
    • This means the leader will wait for the min.insync.replicas number of replicas to acknowledge the record. The min.insync.replicas config can be defined in broker or topic (if overridden in topic level)

Note: The default value for acks  is still all (-1) as before.

Proposed Changes

When the producer sets acks=min.insync.replicas and send a record to the broker, the broker will do the same thing as acks=all did, firstly check if current number of in-sync replicas is greater or equal to min.insync.replicas value, if not, throw NotEnoughReplicasException. If yes, we wait for the number of min.insync.replicas acknowledge, and then respond to the producers.


For the idempotent/transactional producer, currently, the producer requires acks must be 'all'. But as mentioned above, acks=all doesn't guarantee high durability if the in-sync replicas only have 1. So, we can actually complete an idempotent/transactional send with only 1 replica up and all other replicas down, if min.insync.replicas (default is 1) is not set. This said, the number of acks (1 or 2 or 3 or all) is not a key requirement for idempotent/transactional producer. Allowing acks=min.insync.replicas producer to send idempotent/transactional data won't break any contract. In this KIP, We will allow idempotent/transactional producer to set acks=all or acks=min.insync.replicas.

Compatibility, Deprecation, and Migration Plan

This KIP introduces a new acks value and not default value change, it won't have backward compatibility issue for any existing applications.

If producer is upgraded and set to acks=min.insync.replicas(-2),  and server is still staying in the old version, the server will respond InvalidRequiredAcksException to the producer.

If producer is in the old version, depending on the client validation, the acks=min.insync.replicas(-2) might work or might not work. In Java producer, it will throw ConfigException due to invalid value (-2).

Test Plan

Unit test + Integration test

Rejected Alternatives

  1. Allowing acks set to a random value.
    → This is rejected because the random value might exceed the number of replicas, For example, if replication factor is 3, setting acks=4 doesn't make sense. Also, it might break the broker min.insync.replicas contract. For example, if broker set min.insync.replicas to 2, and acks=3, this will let write failure when in-sync replicas only have 2, while it reaches min.insync.replicas requirement. So I think allowing acks=min.insync.replicas makes more sense.
  • No labels