Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Kafka already has user configurable policies which can be used by a cluster administrator to limit how the cluster can be modified by non-administrator users (for example by using the AdminClient API):

  • TopicCreationPolicy CreateTopicPolicy can prevent a topic being created based on topic creation parameters (name, number of partitions & replication factor or replica assignments, topic configs)
  • AlterConfigPolicy can prevent a change to topic config (or, in theory, broker config, but it's current not possible to change broker configs via the AdminClient API)

As new APIs existing tools are added migrated to the AdminClient using AdminClient APIs rather than interacting directly with ZooKeeper we need to apply policies to them, but the existing policy interfaces make it difficult to do this in a consistent way.

Example 1

...

  • It shouldn't be possible to create a topic, but then modify it so that it no longer conforms to the TopicCreationPolicy CreateTopicPolicy.
  • An administrator who wants to prevent increasing the number of partitions entirely for topics with keys, because of the effect on partitioning.

So there needs to be a policy for modifying a topic in this way. But it is confusing and error-prone if there are different policy classes for creation and modification (the CreateTopicPolicy and a new ModifyTopicPolicy, say): It would be very easy for such policies to get out of sync, or configure one but not the other. So there should be it would be better if there were a single policy interface which is applied to both topic creation and modification. We could simply apply the the existing TopicCreationPolicy to modifications, but

  • this would obscure whether a particular invocation of the policy was for a topic creation or modification (the second bullet)
  • we would be left with a misleadingly named policy

. So we conclude we need a different policy than TopicCreationPolicy.

...

Reassigning replicas is another kind of topic modification (KIP-179). By similar reasoning to example 1 it, too, should be covered by the same policy. 

Example 3

Currently the topic config is passed to the CreateTopicPolicy, but if a topic config is later modified the AlterConfigPolicy is applied. If an administrator wants to use the topic config in their policy decisions they have to implement this logic in two places. If the policy decision depends on both the topic config and another aspect of the topic the AlterConfigPolicy interface doesn't provide the necessary information.

How does this KIP relate to KIP-170?Rationalise the AlterConfigPolicy (in particular topic config changes and topic modifications)

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

A new policy interface will be added which properly can be applied uniformly to topic creation and modification.

This policy will be configured via a new configuration key.

The existing policy interfaces CreateTopicPolicy and AlterConfigPolicy will be deprecated, but will continue to be applied where they are currently applied.

Proposed Changes

Add TopicActionPolicy

The following policy interface will be added 

Code Block
languagejava
linenumberstrue
/**
 * A policy that is enforced on actions affecting topics.
 * An implementation of this policy can be configured on a broker via the
 * {@code topic.action.policy.class.name} broker config. 
 * When this is configured the named class will be instantiated reflectively 
 * using its nullary constructor and the instance will be used to enforce
 * the policy on topic creation, modification and deletion.
 */
interface TopicActionPolicy {
    /** Enumerates possible actions on topics. */
    static enum Action {
        /** The creation of a topic. */
        CREATE,
        /** The modification of a topic. */
        MODIFY,
        /** The deletion of a topic. */
        DELETE
    }

     /**
     * Represents the state of a topic either before, or as a result of, an administrative request affecting the topic.
     */
    static interface TopicState {
        /**
         * The number of partitions of the topic.
         */
        public abstract int numPartitions();
        /**
         * The replication factor of the topic.
         */
        public abstract Short replicationFactor();
        /**
         * The replica assignments of the topic.
         */
        public abstract Map<Integer, List<Integer>> replicasAssignments()
        /**
         * The topic config.
         */
        public abstract Map<String,String> configs();
    }

     /**
     * Parameters for a request to perform an {@linkplain #action} on a {@linkplain #topic}
     * @see #validate(RequestMetadata)
     */
    static interface RequestMetadata {
    /**
     * The {@linkplain Action action} being performed on the topic.
     */
    public abstract Action action();
    /**
     * The topic the {@linkplain #action() action} is being performed upon.
     */
    public abstract String topic();
    /**
     * The authenticated principal making the request, or null if the session is not authenticated.
     */
    Principal principal();
    /**
     * The state the topic has before the request.
     * <ul>
     * <li>For {@link Action#CREATE} this will be null.</li>
     * <li>For {@link Action#MODIFY} this will be the state the topic currently has (before the modification).</li>
     * <li>For {@link Action#DELETE} this will be the state of the topic which is going to be deleted.</li>
     * </ul>
     */
    public abstract TopicState preRequestState();
    /**
     * The state the topic will have after the request.
     * <ul>
     * <li>For {@link Action#CREATE} this will be the requested state of the topic to be created.</li>
     * <li>For {@link Action#MODIFY} this will be the state the topic will have after the modification.</li>
     * <li>For {@link Action#DELETE} this will be null.</li>
     * </ul>
     */
    public abstract TopicState postRequestState();
    public abstract String toString();
}
    /**
     * Validate the request parameters and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the request parameters for the provided topic do not satisfy this policy.
     *
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message. Note that validation
     * failure only affects the relevant topic, other topics in the request will still be processed.
     *
     * @param requestMetadata the request parameters for the provided topic.
     * @throws PolicyViolationException if the request parameters do not satisfy this policy.
     */
    void validate(RequestMetadata requestMetadata) throws PolicyViolationException;
}

 

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

This policy will be applied:

  • On topic creation
  • On topic modification
    • Change in topic config, via AdminClient.alterConfigs().
    • Adding partitions to topics, via AdminClient.createPartitions() (see KIP-195)
    • Reassigning partitions to brokers, and/or changing the replication factor via AdminClient.reassignPartitions() (see KIP-179)
    • KIP-113
  • On topic deletion

This will be configurable via the topic.action.policy.class.name broker config.

Note: Unlike previous policy interfaces the inner RequestMetadata is an interface rather than a class. This should simplify testing and better permit use sites to, for example, lazily fetch metadata when it's actually required by the policy implementation, rather than eagerly fetch information which the policy didn't actually require.

Deprecate existing policies

The existing CreateTopicPolicy and AlterConfigPolicy will be deprecated, but will continue to be applied when they are configured.

Using create.topic.policy.class.name or  alter.config.policy.class.name will result in an deprecation warning in the broker logs.

It will be a configuration time error if both create.topic.policy.class.name and topic.action.policy.class.name are used at the same time, or both alter.config.policy.class.name and topic.action.policy.class.name are used at the same time.

Internally, an adapter implementation of TopicActionPolicy will be used when CreateTopicPolicy and AlterConfigPolicy are configured, so policy use sites won't be unnecessarily complicated.

If, in the future, AdminClient.alterConfigs()/AlterConfigsRequest is changed to support changing broker configs a separate policy interface can be applied to such changes.

Compatibility, Deprecation, and Migration Plan

Existing users will have to implement their policies in terms of the new TopicActionPolicy interface, and reconfigure their brokers accordingly.

The deprecated policy interfaces and configuration keys will be removed in a future Kafka version. If this KIP is accepted for Kafka 1.1.0 this removal could happen in Kafka 3.0.0

Rejected Alternatives

The objectives of this KIP could be achieved without deprecating the existing policy classes, but that:

  • incurs ongoing maintenance and testing costs on the project for not overall benefit
  • If two policies were in force it would be more confusing to users when a request was rejected (which policy rejected it?) possibly exacerbated if users didn't know two policies were in force.
  • If it were possible to have two policies in force administrators have not been releived of the burden of maintaining two policies in sync.

 If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.