Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

To get the expected durability, users also need to set min.insync.replicas  (default is 1) config correctly. In the doc of min.insync.replicas config:

...

But with the proposed acks=min.insync.replicas, we can achieve that with 1 producer, and define different durability level in topic min.insync.replicas  config.

Public Interfaces

Proposed Changes

A new acks value will be accepted:

  • acks=0
    • If set to zero then the producer will not wait for any acknowledgment from the server at all.
  • acks=1
    • This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers.
  • acks=all(-1)
    • This means the leader will wait for the full set of in-sync replicas to acknowledge the record.
  • acks=min.insync.replicas(-2)
    • This means the leader will wait for the min.insync.replicas number of replicas to acknowledge the record. The min.insync.replicas config can be defined in broker or topic (if overridden in topic level)

Proposed Changes

When the producer sets acks=min.insync.replicas and send a record to the broker, the broker will do the same thing as acks=all did, firstly check if current number of in-sync replicas is greater or equal to min.insync.replicas value, if not, throw NotEnoughReplicasException. If yes, we wait for the number of min.insync.replicas acknowledge, and then respond to the producers.


For the idempotent/transactional producer, currently, the producer requires acks must be 'all'. But as mentioned above, acks=all doesn't guarantee high durability if the in-sync replicas only have 1. So, we can actually complete an idempotent/transactional send with only 1 replica up and all other replicas down, if min.insync.replicas (default is 1) is not set. This said, the number of acks (1 or 2 or 3 or ...) is not a key requirement for idempotent/transactional producer. Allowing acks=min.insync.replicas producer to send idempotent/transactional data is acceptable. In this KIP, I'll allow idempotent/transactional producer to set acks=all or acks=min.insync.replicasDescribe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

...

This KIP introduces a new acks value, it won't have backward compatibility issue for any existing applications.

If producer is upgraded and set to acks=min.insync.replicas(-2),  and server is still staying in the old version, the server will respond InvalidRequiredAcksException to the producer.

If producer is in the old version, depending on the client validation, the acks=min.insync.replicas(-2) might work not might not work. In Java producer, it will throw ConfigException due to invalid value (-2).

Test Plan

Unit test + Integration test

Rejected Alternatives

  1. Allowing acks set to a random value.
    → This is rejected because the random value might break the broker min.insync.replicas contract. For example, if broker set min.insync.replicas to 2, and acks=3, this will let write failure when in-sync replicas only have 2, while it reaches min.insync.replicas requirement. So I think allowing acks=min.insync.replicas makes more sense.