You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 19 Next »

Status

Current stateUnder Discussion

Discussion thread: here 

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The producer should enable the strongest message delivery guarantee by default.

  1. Exactly Once Delivery. In KIP-98, the community introduces Exactly Once Delivery, which guarantees that every message will be persisted exactly once without data loss or duplication. However, in order to give the community time for upgrading, by default, the producer will still use at-least-once delivery and set "enable.idempotence=false".
  2. N-1 failure toleration. If the brokers are flushing records to disks asynchronously, the strongest concurrent failure Kafka can tolerate is N-1. (i.e. N-1 brokers shut down before the memory messages flush to disk). However, for some performance reasons, by default, the producer config will set "ack=1", where the data loss can happen if the partition leader shutdown.

Having these guarantee won't impact the performance in a significant way, as stated here: 

An analysis of the impact of max.in.flight.requests.per.connection and acks on Producer performance

Public Interfaces

  1. Producer config defaults will change in release version 3.0
Producer config nameFromTo
enable.idempotencefalsetrue
acks1all

2. Producer ACL required for enabling idempotence will change from `IDEMPOTENT_WRITE` to `WRITE` in release version 2.8

A. In `ProduceRequest` handler, we remove the `IDEMPOTENT_WRITE` access check since it already has the `WRITE` access check on topics.

B. In `InitProduceIdRequest` handler, if the producer will get authorized if it has `WRITE` access on ANY topic.

C. `kafka-acls` will show deprecation warnings if users are trying to grant the `IDEMPOTENT_WRITE` access.

3. `org.apache.kafka.server.authorizer.Authorizer` will have a new interface for checking if the caller is authorized to perform the given ACL operation on at least one resource satisfying the filter. It will have a default implementation assuming `allow.everyone.if.no.acl.found=false`. 

clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
    /**
     * Check if the caller is authorized to perform the given ACL operation on at least one
     * resource of the given type.
     *
     * @param requestContext Request context including request resourceType, security protocol, and listener name
     * @param op             The ACL operation to check
     * @param resourceType   The resource type
     * @return               Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the
     *                       given ACL operation on at least one resource of the given type. 
     *                       Return {@link AuthorizationResult#DENIED} otherwise.
     */
    default AuthorizationResult authorizeAny(AuthorizableRequestContext requestContext,
                                     AclOperation op,
                                     ResourcePatternFilter f) {
        ResourcePatternFilter resourceFilter = new ResourcePatternFilter(type, null, PatternType.ANY);
        AclBindingFilter aclFilter = new AclBindingFilter(
            resourceFilter, new AccessControlEntryFilter(
                requestContext.principal().toString(),
                requestContext.clientAddress().getHostAddress(),
                op,
                AclPermissionType.ANY));
        for (AclBinding binding : acls(aclFilter)) {
            if (binding.entry().permissionType() != AclPermissionType.ALLOW)
                continue;
            List<Action> action = Collections.singletonList(new Action(
                op, binding.pattern(), 1, false, false));
            if (authorize(requestContext, action).get(0) == AuthorizationResult.ALLOWED) {
                return AuthorizationResult.ALLOWED;
            }
        }
        return AuthorizationResult.DENIED;
    }

Proposed Changes

AclAuthorizer and SimpleAclAuthorizer

AclAuthorizer and SimpleAclAuthorizer will override the new interface `org.apache.kafka.server.authorizer.Authorizer#authorizeAny` to 

  1. improve the performance
  2. implement the `allow.everyone.if.no.acl.found` logic

`IDEMPOTENT_WRITE` Deprecation

Besides the public interface changes above, we will deprecate `IDEMPOTENT_WRITE` in release version 2.8 because it's kind of trivial by practice.

We are relaxing the ACL restriction from `IDEMPOTENT_WRITE` to `WRITE` earlier (release version 2.8) and changing the producer defaults later (release version 3.0) in order to give the community users enough time to upgrade their broker first. So their later client-side upgrading, which enables idempotence by default, won't get blocked by the `IDEMPOTENT_WRITE` ACL required by the old version brokers.

`IDEMPOTENT_WRITE` will be deprecated in 2.8 but won't be removed in a short term, in order to give the community enough time to upgrade their `authorizer` implementation.

Compatibility, Deprecation, and Migration Plan

For changing the default of `acks`, there won't be any compatibility issues. But several compatibility issues may occur for changing the default of `enable.idempotence`:

  1. In the scenario below, people will need to either a) grant the producers the `IDEMPOTENT_WRITE` access or b) change their producer config to explicitly disable the idempotence (set `enable.idempotence = false`).
    1. use producers with release version >= 3.0
    2. use brokers with release version < 2.8
  2. In the scenario below, people will need to either a) upgrade their topic format version >= V2 or b) change their producer config to explicitly disable the idempotence (set `enable.idempotence = false`).
    1. use producers with release version >= 3.0
    2. have any topic using the message format < V2 while producers with release version >= 3.0 will produce to this topic
  3. In the scenario below, people will need to implement the new `Authorizer#authorizeAny` interface
    1. use their own authorizer implementation other than `AclAuthorizer` and `SimpleAclAuthorizer`
    2. the customized authorizer support the configuration `allow.everyone.if.no.acl.found`

Rejected Alternatives

There's an alternative way to implement a fallback semantics on `enable.idempotence` to mitigate the compatibility issue. Specifically, by default, the producer will set `enable.idempotence` as `suggested`, a new option, to let the producer and brokers decide if idempotence can be enabled or not. However, since the fallback

  1. adds more complexity to the client and broker side logic
  2. may cause confusions and thus unexpected behavior

We decide to propose the simplest approach at the cost of the small compatibility issues.

  • No labels