Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: A single TopicManagementPolicy

...

  • CreateTopicPolicy can prevent a topic being created based on topic creation parameters (name, number of partitions & replication factor or replica assignments, topic configs)
  • AlterConfigPolicy can prevent a change to topic config (or, in theory, broker config, but it's current not possible to change broker configs via the AdminClient API)

As existing tools are migrated to using AdminClient APIs rather than interacting directly with ZooKeeper we need to apply policies to them, but the existing policy interfaces make it difficult to do this in a consistent way.

Problem 1 - Topic config is governed by both CreateTopicPolicy and AlterConfigPolicy

Currently the topic config is passed to the CreateTopicPolicy, but if a topic config is later modified the AlterConfigPolicy is applied. If an administrator wants to use the topic config in their policy decisions they have to implement this logic in two places. If the policy decision depends on both the topic config and another aspect of the topic the AlterConfigPolicy interface doesn't provide the necessary information.

Problem 2 - Creating more partitions is not currently covered by a policy

Changing the number of partitions in a topic was the subject of KIP-195 and is just one kind of topic modification. Consider two example use cases:

  1. It shouldn't be possible to create a topic, but then modify it so that it no longer conforms to the CreateTopicPolicy.
  2. An administrator who wants to prevent increasing the number of partitions entirely for topics with keys, because of the effect on partitioning.

To solve 1, we could simply apply the existing TopicCreationPolicy to modifications, but

  • this would obscure whether a particular invocation of the policy was for a topic creation or modification (the second bullet)
  • we would be left with a misleadingly named policy

So there needs to be a policy for specifically for modifying a topic. But it is confusing and error-prone if there are different policy classes for creation and modification (the CreateTopicPolicy and a new ModifyTopicPolicy, say): It would be easy for the code implementing a user's policies to get out of sync if it needs to be maintained in two places. It would also be easy to configure one policy but not the other. So it would be better if there were a single policy interface which is applied to both topic creation and modification.

Problem 3 - CreateTopicPolicy can govern partition assignment, but there is no policy for reassignment

Reassigning replicas is another kind of topic modification and the subject of KIP-179. By similar reasoning to example 2 it, too, should be covered by the same policy.

Problem 4 - There is no policy for topic deletion or message deletion

KIP-170 proposes a policy for topic deletion (see that KIP for the motivation behind this) and KIP-204 proposes to add an AdminClient API for the existing network protocol for deleting messages from the partitions of a topic.

It's pointless to add a policy for topic deletion if there is no policy for message deletion (deleting all the messages from the topic is practically equivalent to deleting the topic itself in most cases). If there were a separate TopicDeletePolicy and MessageDeletePolicy we have the similar problem as described above for separate topic creation and modification policies: It's unnecessarily difficult and tedious to keep the policies consistent.

Public Interfaces

Two new policy interfaces will be added:

  • TopicActionsPolicy will apply to topic creation and alteration and be configured by the new topic.actions.policy.class.name config
  • TopicDeletionPolicy will apply to message and topic deletion and be configured by the new topic.deletion.policy.class.name config.

The existing policy interfaces CreateTopicPolicy and AlterConfigPolicy will be deprecated, but will continue to be applied where they are currently applied until they are removed.

Proposed Changes

Add TopicActionsPolicy, TopicDeletionPolicy and supporting interfaces

The following policy interfaces and supporting classes will be added

The existing policies were added in KIP-108 and KIP-133 and at that time there was an expectation that the AdminClient would gain a single API for topic modification. However, discussion about KIP-179, and work on KIP-195 shows that the AdminClient will end up with multiple APIs for modifying topics in different ways. Consequently there isn't and won't be a direct mapping between operations and policies.

Adding new policies ad-hoc is likely to lead to a poor result because:

  • Users need to know about a policy to implement it. Its easier to know about 1 thing than half a dozen.
  • The more policies there are the more places they need to be configured.

Problem 1 - Topic config is governed by both CreateTopicPolicy and AlterConfigPolicy

Currently the topic config is passed to the CreateTopicPolicy, but if a topic config is later modified the AlterConfigPolicy is applied. If an administrator wants to use the topic config in their policy decisions they have to implement this logic in two places (or at least multiply inherit both policy implementations and configure the same class name for each policy). If the policy decision depends on both the topic config and another aspect of the topic the AlterConfigPolicy interface doesn't provide the necessary information.

Problem 2 - Creating more partitions is not currently covered by any policy

Changing the number of partitions in a topic was the subject of KIP-195 and is just one kind of topic modification. Consider two example use cases:

  1. It shouldn't be possible to create a topic, but then modify it so that it no longer conforms to the CreateTopicPolicy.
  2. An administrator who wants to prevent increasing the number of partitions entirely for topics with keys, because of the effect on partitioning.

To solve 1, we could simply apply the existing TopicCreationPolicy to modifications, but

  • this would obscure whether a particular invocation of the policy was for a topic creation or modification (the second bullet)
  • we would be left with a misleadingly named policy

So there needs to be a policy for specifically for modifying a topic. But it is confusing and error-prone if there are different policy classes for creation and modification (the CreateTopicPolicy and a new ModifyTopicPolicy, say): It would be easy for the code implementing a user's policies to get out of sync if it needs to be maintained in two places. It would also be easy to configure one policy but not the other.

Multiply inheriting separate the CreateTopicPolicy and a new ModifyTopicPolicy is a solution, but still requires multiple configuration keys.

It would be better if there were a single policy interface which is applied to both topic creation and modification in a more uniform way.

Problem 3 - CreateTopicPolicy can govern partition assignment, but there is no policy for reassignment

Reassigning replicas is another kind of topic modification and the subject of KIP-179. By similar reasoning to example 2 it, too, should be covered by the same policy. This would require that the same request metadata could describe both kinds of modification satisfactorily.

Problem 4 - There is no policy for topic deletion or record deletion

KIP-170 proposes a policy for topic deletion (see that KIP for the motivation behind this) and KIP-204 proposes to add an AdminClient API for the existing network protocol for deleting records from the partitions of a topic.

While deleting records from a topic is not the same as deleting the topic itself, both result in records being deleted, and it is those records which have value to the company or organisation operating the cluster. Thus if topic deletion is deserving of a policy, it should also be possible to apply a similar policy to record deletion, otherwise a user might be able to apply business rules to on the one kind of deletion, but not the other. If there were a separate TopicDeletePolicy and MessageDeletePolicy we have the similar problem as described above for separate topic creation and modification policies: It's unnecessarily difficult and tedious to keep the policies consistent and correctly configured.

Public Interfaces

A new policy interface will be added, TopicManagementPolicy will apply to topic creation, topic alteration, topic deletion and message deletion. It will be configured by the new topic.management.policy.class.name config.

The existing policy interfaces CreateTopicPolicy and AlterConfigPolicy will be deprecated, but will continue to be applied where they are currently applied until they are removed.

Proposed Changes

Add TopicManagementPolicy and supporting interfaces

The following policy interfaces and supporting classes will be added

Code Block
languagejava
linenumberstrue
/**
 * Represents the state of a topic either before, or as a result of,
 * an administrative request affecting a topic.
 */
interface TopicState {
Code Block
languagejava
linenumberstrue
/**
 * Represents the state of a topic either before, or as a result of,
 * an administrative request affecting a topic.
 */
interface TopicState {
    /**
     * The number of partitions of the topic.
     */
    int numPartitions();

    /**
     * The replication factor of the topic.
     */
    Short replicationFactor();

    /**
     * The number replicaof assignmentspartitions of the topic.
     */
    Map<Integer, List<Integer>> replicasAssignmentsint numPartitions();

    /**
     * The topicreplication config.
factor of the topic. More */
precisely, the number of Map<String,String> configs();

    /**assigned replicas for partition 0.
     * Returns// whetherTODO thewhat topicabout is marked for deletion.during reassignment
     */
    booleanShort markedForDeletionreplicationFactor();

}


    /**
 The current state    * A map of the replica topicsassignments inof the clustertopic, beforewith thepartition requestids takesas effect.keys */and
interface ClusterState {
    /**
 the assigned brokers  * Returnsas the currentcorresponding statevalues.
 of the given topic, or null if the topic does not exist.* // TODO what about during reassignment
     */
    Map<Integer, TopicStateList<Integer>> topicStatereplicasAssignments(String topicName);

    /**
     * ReturnsThe all the topics in the cluster, including internal topics iftopic config.
     */
 {@code includeInternal} is trueMap<String, and including those marked for deletionString> configs();

    /**
     * if {@code includeMarkedForDeletion} is trueReturns whether the topic is marked for deletion.
     */
    Set<String>boolean topics(boolean includeInternal, boolean includeMarkedForDeletionmarkedForDeletion();
}

    /**
     * AReturns whether policythe thattopic is enforcedan oninternal topic creation and alteration.
 * An implementation of this policy can be configured on a broker via the
 * {@code topic.actions.policy.class.name} broker config.
 * When this is configured the named class will be instantiated reflectively
 * using its nullary constructor and will then pass the broker configs to
 * its <code>configure()</code> method. During broker shutdown, the
 * <code>close()</code> method will be invoked so that resources can be
 * released (if necessary).
 *
 * @see TopicDeletionPolicy for the policy for deleting messages and topics.
 */
interface TopicActionsPolicy extends Configurable, AutoCloseable {

    /**
     * Parameters for a request to {@linkplain #isCreate() create} or.
     */
    boolean internal();

}


/** The current state of the topics in the cluster, before the request takes effect. */
interface ClusterState {
    /**
     * Returns the current state of the given topic, or null if the topic does not exist.
     */
    TopicState topicState(String topicName);

    /**
     * Returns all the topics in the cluster, including internal topics if
     * {@code includeInternal} is true, and including those marked for deletion
     * if {@code includeMarkedForDeletion} is true.
     */
    Set<String> topics(boolean includeInternal, boolean includeMarkedForDeletion);

    /**
     * {@linkplain #isAlter() alter} the given {@linkplain #topic}The number of brokers in the cluster.
     */
    int * @see #validate(RequestMetadata, ClusterState)
     */
    static interface RequestMetadata {

        /**
         * Returns true if the request is for the creation of the given {@link #topic()}.
         */
        public boolean isCreate();

        /**
         * Returns true if the request is for the alteration of the given {@link #topic()}.
         */
        public boolean isAlter();clusterSize();
}


/**
 * A policy that is enforced on topic creation, alteration and deletion,
 * and for the deletion of messages from a topic.
 *
 * An implementation of this policy can be configured on a broker via the
 * {@code topic.management.policy.class.name} broker config.
 * When this is configured the named class will be instantiated reflectively
 * using its nullary constructor and will then pass the broker configs to
 * its <code>configure()</code> method. During broker shutdown, the
 * <code>close()</code> method will be invoked so that resources can be
 * released (if necessary).
 *
 */
interface TopicManagementPolicy  extends Configurable, AutoCloseable {

    static interface AbstractRequestMetadata {

        /**
         * The topic the action is being performed upon.
         */
        public String topic();

        /**
         * The authenticated principal making the request, or null if the session is not authenticated.
         */
        public KafkaPrincipal principal();
     */}


    static interface CreateTopicRequest extends public KafkaPrincipal principal();

AbstractRequestMetadata {
        /**
         * The requested state of the topic willto have after the requestbe created.
         * <ul>/
        public * <li>When {@link #isCreate()} is true, this will be the requested state of the topic to be created.</li>TopicState requestedState();
    }

    /**
     * Validate the given *request <li>Whento {@link #isAlter()} is true, this will be the state the topic will have after the alteration.</li>create a topic
     * and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the * </ul>
    request does not satisfy this policy.
     */
     * Clients will publicreceive TopicState requestedState();

    }

    /**the POLICY_VIOLATION error code along with the exception's message.
     * ValidateNote thethat requestvalidation parametersfailure andonly throwaffects athe <code>PolicyViolationException</code> with a suitable errorrelevant topic,
     * messageother topics ifin the request parameterswill forstill the provided topic do not satisfy this policybe processed.
     *
     * Clients@param will receiverequestMetadata the POLICY_VIOLATIONrequest errorparameters code alongfor with the exception'sprovided messagetopic.
     * Note@param thatclusterState validationthe failurecurrent onlystate affectsof the relevant topic,cluster
     * other@throws topicsPolicyViolationException inif the request parameters willdo not stillsatisfy bethis processedpolicy.
     */
    void * @param requestMetadata the request parameters for the provided topic.
     * @param clusterState the current state of the cluster
validateCreateTopic(CreateTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;

    static interface AlterTopicRequest extends AbstractRequestMetadata {
        /**
         * @throwsThe PolicyViolationExceptionstate if the requesttopic parameterswill dohave notafter satisfythe this policyalteration.
         */
    void validate(RequestMetadata requestMetadata, ClusterState clusterState)public throwsTopicState PolicyViolationExceptionrequestedState();
}

/**
 * A policy that}

 is enforced on message or topic deletion. /**
 * An implementation of this* policyValidate canthe begiven configuredrequest onto aalter brokeran viaexisting thetopic
 * {@code topic.deletion.policy.class.name} broker config.
 * Whenand thisthrow isa configured<code>PolicyViolationException</code> thewith nameda class will be instantiated reflectively
 * using its nullary constructor and will then pass the broker configs to
 * its {@link #configure(Map)}} method. During broker shutdown, the
 * {@link #close()} method will be invoked so that resources can be
 * released (if necessary).
 *
 * @see TopicActionsPolicy for the policy for creating and altering topics.
 */
interface TopicDeletionPolicy extends Configurable, AutoCloseable {
    /**
     * Parameters for a request to delete {@linkplain #isMessageDeletion()} messages} from the topicsuitable error
     * message if the request does not satisfy this policy.
     *
     * The given {@code clusterState} can be used to discover the current state of the topic to be modified.
     *
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message.
     * Note that validation failure only affects the relevant topic,
     * other topics orin {@linkplain #isTopicDeletion() delete the entire topic}the request will still be processed.
     *
     * @see #validate(RequestMetadata, ClusterState) @param requestMetadata the request parameters for the provided topic.
     */
 @param clusterState the current staticstate interfaceof RequestMetadatathe {cluster

     * @throws PolicyViolationException  /**
    if the request parameters do not satisfy this policy.
     */
  The topic thatvoid is the subject of the deletion.validateAlterTopic(AlterTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;

    /**
     */
 Parameters for a request to delete the public Stringgiven topic();.

     */
   /**
 static interface DeleteTopicRequest extends AbstractRequestMetadata {
   * The}

 authenticated principal making the request, or null if the session is not authenticated.
    /**
     * Validate the given request to delete a topic
     */
 and throw a <code>PolicyViolationException</code> with a suitable publicerror
 KafkaPrincipal principal();

   * message if the request /**
does not satisfy this policy.
     *
 Returns true if the topic* itselfThe isgiven being deleted, or false if
         *{@code clusterState} can be used to discover the current state of the topic isto not beingbe deleted but.
 zero or more records from*
     * Clients will receive the *POLICY_VIOLATION oneerror orcode morealong ofwith the topicexception's partitions are being deletedmessage.
     * Note that validation *failure Thisonly is mutually exclusive with {@link #isMessageDeletion()}.
affects the relevant topic,
     * other topics in the */
request will still be processed.
    boolean isTopicDeletion();
*
     * @param requestMetadata /**
the request parameters for the provided topic.
   * Returns true* zero@param orclusterState morethe recordscurrent from
state of the  cluster
     * one@throws orPolicyViolationException more ofif the topic'srequest partitionsparameters aredo beingnot deleted,satisfy butthis thepolicy.
 topic itself is
  */
    void validateDeleteTopic(DeleteTopicRequest requestMetadata, *ClusterState notclusterState) being deleted.throws PolicyViolationException;

    /**
     * This is mutually exclusive with {@link #isTopicDeletion()}.
     Parameters for a request to delete records from the topic.
     */
    static interface DeleteRecordsRequest extends boolean isMessageDeletion();AbstractRequestMetadata {

        /**
         * Returns a map of topic partitions and the corresponding offset of the last message
         * to be retained. Messages before this offset will be deleted.
         * Partitions which won't have messages deleted won't be present in the map.
         * When {@link #isTopicDeletion()} is true then all of the topic's partitions will be/
        Map<Integer, Long> deletedMessageOffsets();
    }

     /**
 present in the map and* allValidate the offsetsgiven willrequest beto {@link Long#MAX_VALUE}.
   delete records from a topic
      */
 and throw a <code>PolicyViolationException</code> with a suitable Map<Integer,error
 Long> deletedMessageOffsets();
   * }
message if the request /**
does not satisfy this  *policy.
 Validate the request parameters and*
 throw a <code>PolicyViolationException</code> with a* suitableThe error
given {@code clusterState} can be *used messageto ifdiscover the requestcurrent parametersstate forof the provided topic doto nothave satisfyrecords this policydeleted.
     *
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message.
     * Note that validation failure only affects the relevant topic,
     * other topics in the request will still be processed.
     *
     * @param requestMetadata the request parameters for the provided topic.
     * @param clusterState the current state of the cluster
     * @throws PolicyViolationException if the request parameters do not satisfy this policy.
     */
    void validatevalidateDeleteRecords(RequestMetadataDeleteRecordsRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;
}

The TopicActionsPolicy TopicManagementPolicy will be applied:

  • On topic creation, i.e. when processing a CreateTopicsRequest
  • On topic modification
    • Change in topic config, ie. when processing AlterConfigsRequest, for topic configs (this change done as part of this KIP).
    • Adding partitions to topics, i.e. when processing a CreatePartitionsRequest (see KIP-195, but this change done as part of this KIP)
    • Reassigning partitions to brokers, and/or changing the replication factor when processing ReassignPartitionsRequest (see KIP-179)

The TopicDeletionPolicy will be applied:

  • on topicOn topic deletion, i.e. when processing a DeleteTopicsRequest.
  • on On message deletion, i.e. when processing a DeleteRecordsRequest.

...

  • .

Deprecate existing policies

...

It will be a configuration time error if both create.topic.policy.class.name and topic.actionsmanagement.policy.class.name are used at the same time, or both alter.config.policy.class.name and topic.actionsmanagement.policy.class.name are used at the same time.

Internally, an adapter implementation of TopicActionsPolicy TopicManagementPolicy will be used when CreateTopicPolicy and AlterConfigPolicy are configured, so policy use sites won't be unnecessarily complicated.

...

Existing users will have to reimplement their policies in terms of the new TopicActionsPolicy TopicManagementPolicy interface, and reconfigure their brokers accordingly. Since the TopicActionsPolicy TopicManagementPolicy contains a superset of the existing information used by the deprecated policies such reimplementation should be trivial.

The deprecated policy interfaces and configuration keys will be removed in a future Kafka version. If this KIP is accepted for Kafka 1.1.0 this removal could happen in Kafka 2.0.0 or a later major release.

Rejected Alternatives

The objectives of this KIP could be achieved without deprecating the existing policy classes, but that:

  • incurs ongoing maintenance and testing costs on the project for not overall benefit
  • If two policies were in force it would be more confusing to users when a request was rejected (which policy rejected it?) possibly exacerbated if users didn't know two policies were in force.
  • If it were possible to have two policies in force administrators have not been relieved of the burden of maintaining two policies in sync.

The proposed TopicActionsPolicy doesn't have to cover the topic deletion case: That could still be handled by a separate policy, but it is desirable to have a single policy to cover the whole lifecycle of a topic, and for the same information to be made available about a topic being deleted as about a topic being modified.

the existing information used by the deprecated policies such reimplementation should be trivial.

The deprecated policy interfaces and configuration keys will be removed in a future Kafka version. If this KIP is accepted for Kafka 1.1.0 this removal could happen in Kafka 2.0.0 or a later major release.

Rejected Alternatives

The objectives of this KIP could be achieved without deprecating the existing policy classes, but that:

  • incurs ongoing maintenance and testing costs on the project for not overall benefit
  • If two policies were in force it would be more confusing to users when a request was rejected (which policy rejected it?) possibly exacerbated if users didn't know two policies were in force.
  • If it were possible to have two policies in force administrators have not been relieved of the burden of maintaining two policies in sync.

Having separate policy interfaces was considered, but rejected because there would need to be several of them, making it harder for people to discover, understand, implement and configure policies. It would also be easy for users to miss if a new policy interface was added.

Having separate policy interfaces for creation/modification and deletion and retaining the existing single-method-per-policy-interface design was considered, but rejected because it was a half way house between having multiple policies and having a single policy.

 The proposed TopicActionsPolicy doesn't cover the use case of records being deleted from a topic. This is not the same as the modification of a topic, and would require a different policy interface. It might be appropriate to use the same topic state in such a policy interface, however.