Comment: avoid null principle


Current state: Under Discussion

  • CreateTopicPolicy can prevent a topic being created based on topic creation parameters (name, number of partitions & replication factor or replica assignments, topic configs)
  • AlterConfigPolicy can prevent a change to topic config (or, in theory, broker config, but it's current not possible to change broker configs via the AdminClient API)

As existing tools are migrated to using AdminClient APIs rather than interacting directly with ZooKeeper we need to apply policies to them, but the existing policy interfaces make it difficult to do this in a consistent way.

Example 1

Currently the topic config is passed to the CreateTopicPolicy, but if a topic config is later modified the AlterConfigPolicy is applied. If an administrator wants to use the topic config in their policy decisions they have to implement this logic in two places. If the policy decision depends on both the topic config and another aspect of the topic the AlterConfigPolicy interface doesn't provide the necessary information.

Example 2

Changing the number of partitions in a topic was the subject of KIP-195 and is just one kind of topic modification. Consider two example use cases:

  1. It shouldn't be possible to create a topic, but then modify it so that it no longer conforms to the CreateTopicPolicy.
  2. An administrator who wants to prevent increasing the number of partitions entirely for topics with keys, because of the effect on partitioning.

To solve 1, we could simply apply the existing TopicCreationPolicy to modifications, but

  • this would obscure whether a particular invocation of the policy was for a topic creation or modification (the second bullet)
  • we would be left with a misleadingly named policy

So there needs to be a policy for specifically for modifying a topic. But it is confusing and error-prone if there are different policy classes for creation and modification (the CreateTopicPolicy and a new ModifyTopicPolicy, say): It would be easy for the code implementing a user's policies to get out of sync if it needs to be maintained in two places. It would also be easy to configure one policy but not the other. So it would be better if there were a single policy interface which is applied to both topic creation and modification.

Example 3

Reassigning replicas is another kind of topic modification and the subject of KIP-179. By similar reasoning to example 2 it, too, should be covered by the same policy.

How does this KIP relate to KIP-170?

Public Interfaces

A new policy interface will be added which properly can be applied uniformly to topic creation and modifications.

This policy will be configured via a new configuration key,

The existing policy interfaces CreateTopicPolicy and AlterConfigPolicy will be deprecated, but will continue to be applied where they are currently applied until they are removed.

Proposed Changes

Add TopicActionsPolicy

The following policy interface will be added

The existing policies were added in KIP-108 and KIP-133 and at that time there was an expectation that the AdminClient would gain a single API for topic modification. However, discussion about KIP-179, and work on KIP-195 shows that the AdminClient will end up with multiple APIs for modifying topics in different ways. Consequently there isn't and won't be a direct mapping between operations and policies.

Adding new policies ad-hoc is likely to lead to a poor result because:

  • Users need to know about a policy to implement it. Its easier to know about 1 thing than half a dozen.
  • The more policies there are the more places they need to be configured.

Problem 4 - There is no policy for topic deletion or record deletion

KIP-170 proposes a policy for topic deletion (see that KIP for the motivation behind this) and KIP-204 proposes to add an AdminClient API for the existing network protocol for deleting records from the partitions of a topic.

While deleting records from a topic is not the same as deleting the topic itself, both result in records being deleted, and it is those records which have value to the company or organisation operating the cluster. Thus if topic deletion is deserving of a policy, it should also be possible to apply a similar policy to record deletion, otherwise a user might be able to apply business rules to on the one kind of deletion, but not the other. If there were a separate TopicDeletePolicy and MessageDeletePolicy we have the similar problem as described above for separate topic creation and modification policies: It's unnecessarily difficult and tedious to keep the policies consistent and correctly configured.

Problem 5 - The existing CreateTopicPolicy doesn't have enough information

As noted in KIP-170, for some use cases, the existing CreateTopicPolicy doesn't get passed enough information for the operators desired rules to be enfored. For example, while providing Kafka-as-a-Service there is the need to ensure that the new topic require more resources than the cluster can support (e.g. number of partitions won't exceed some maximum).

Public Interfaces

A new policy interface will be added, TopicManagementPolicy will apply to topic creation, topic alteration, topic deletion and message deletion. It will be configured by the new config.

The existing policy interfaces CreateTopicPolicy and AlterConfigPolicy will be deprecated, but will continue to be applied where they are currently applied until they are removed.

New versions of existing network protocol DeleteTopicsRequest and DeleteRecordsRequest will be added, to add a validate_only flag.

New versions of existing network protocol DeleteTopicsResponse and DeleteRecordsResponse will be added to include an error message.

Proposed Changes

Add TopicManagementPolicy and supporting interfaces

The following policy interfaces and supporting classes will be added

Code Block
 * Represents the state of a topic.
interface TopicState {
     * The number of partitions of the topic.
Code Block
 * A policy that is enforced on actions affecting topics.
 * An implementation of this policy can be configured on a broker via the
 * {@code} broker config. 
 * When this is configured the named class will be instantiated reflectively 
 * using its nullary constructor and will then pass the broker configs to 
 * its <code>configure()</code> method. During broker shutdown, the 
 * <code>close()</code> method will be invoked so that resources can be 
 * released (if necessary).
interface TopicActionsPolicy extends Configurable, AutoCloseable {
    /** Enumerates possible actions on topics. */
    static enum Action {
        /** The creation of a topic. */

        /** The modification of a topic. */

        /** The deletion of a topic. */

     * Represents the state of a topic either before, or as a result of, an administrative request affecting the topic.
    static interface TopicState {
         * The number of partitions of the topic.
        public int numPartitions();

         * The replication factor of the topic.
        The replication factor of the topic.
         More precisely, the number of assigned replicas for partition 0.
 
     short replicationFactor();

    short replicationFactor();
     the assigned brokers as the corresponding values.

     Map<Integer, List<Integer>> replicasAssignments();

  The topic config.
     * /**
    Returns whether the topic is marked for deletion.
 boolean markedForDeletion();
     */ The topic config.
   public booleanMap<String,String> markedForDeletionconfigs();

     * ParametersReturns forwhether athe requesttopic tois performmarked an {@linkplain #action} on a {@linkplain #topic}for deletion.
    boolean * @see #validate(RequestMetadata, ClusterState)markedForDeletion();
     * staticReturns interfacewhether RequestMetadatathe {

topic is an internal topic.
    / **/
    boolean internal();
 * TheRepresents {@linkplainthe Actionrequested action}state being performed on theof a topic.
interface RequestedTopicState extends TopicState {
     * True if publicthe Action{@link actionTopicState#replicasAssignments();
     * in this /**
request we generated by the broker, false if
  * The topic the* {@linkplain #action() action} is being performed uponthey were explicitly requested by the client.
        public String topicboolean generatedReplicaAssignments();

         * The authenticatedtopic principalconfig makingas theit request,will orbe null if the sessionrequest is not authenticatedsuccessful.
     * This is effectively */
the same as the value of {@code configs}
 public KafkaPrincipal principal();

  * after the following computation:
   * <pre><code>
     * The state theMap<String, topicString> willconfigs have after the request.
   = currentState.configs();
      * <ul>
     * <li>For {@link Action#CREATE} this will be the requested state of the topic to be created.</li></code></pre>
    Map<String,String> configs();

     * <li>ForThe {@linktopic Action#MODIFY}configs thispresent willin be the staterequest.
 the topic will have after the modification.</li> */
    Map<String,String>     requestedConfigs();
/** <li>ForThe {@linkcurrent Action#DELETE}state thisof willthe be null.</li>
         * </ul>
         */topics in the cluster, before the request takes effect. */
interface ClusterState {
     * Returns the publiccurrent TopicState postRequestState();


    /** The current state of the topics in the cluster, before the request takes effect.state of the given topic, or null if the topic does not exist.
    interfaceTopicState ClusterState {topicState(String topicName);
     * Returns all   * Returns the currenttopics statein of the givencluster, topic,including orinternal nulltopics if
 the topic does not exist.
* {@code includeInternal} is true, and including those marked for */deletion
     * if {@code publicincludeMarkedForDeletion} TopicState topicState(String topicName);

is true.
    Set<String> topics(boolean includeInternal, boolean includeMarkedForDeletion);
 * Returns a Map/**
 with all topics and their* correspondingThe number of partitions.
brokers in the cluster.
        public Map<String, Integer> topicsPartitionCountint clusterSize();


     * Validate Returns the requestcurrent parametersstate andof throwthe a <code>PolicyViolationException</code> with a suitable errorbroker in which the method is called.
 message if the request parameters for the provided topic do not satisfy this policy.
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message. Note that validation
     * failure only affects the relevant topic, other topics in the request will still be processed.
     * @param requestMetadata the request parameters for the provided topic.
     * @param clusterState the current state of the cluster
     * @throws PolicyViolationException if the request parameters do not satisfy this policy.
    void validate(RequestMetadata requestMetadata, ClusterState clusterState) throws PolicyViolationException;

This policy will be applied:

  • On topic creation
  • On topic modification
    • Change in topic config, via AdminClient.alterConfigs() (this change done as part of this KIP).
    • Adding partitions to topics, via AdminClient.createPartitions() (see KIP-195, but this change done as part of this KIP)
    • Reassigning partitions to brokers, and/or changing the replication factor via AdminClient.reassignPartitions() (see KIP-179)
  • On topic deletion

This will be configurable via the broker config.

Note: Unlike previous policy interfaces the inner RequestMetadata is an interface rather than a class. This should simplify testing and better permit use sites to, for example, lazily fetch metadata when it's actually required by the policy implementation, rather than eagerly fetch information which the policy didn't actually require.

Deprecate existing policies

The existing CreateTopicPolicy and AlterConfigPolicy will be deprecated, but will continue to be applied when they are configured.

Using or will result in an deprecation warning in the broker logs.

It will be a configuration time error if both and are used at the same time, or both and are used at the same time.

Internally, an adapter implementation of TopicActionsPolicy will be used when CreateTopicPolicy and AlterConfigPolicy are configured, so policy use sites won't be unnecessarily complicated.

If, in the future, AdminClient.alterConfigs()/AlterConfigsRequest is changed to support changing broker configs a separate policy interface can be applied to such changes.

Compatibility, Deprecation, and Migration Plan

Existing users will have to reimplement their policies in terms of the new TopicActionsPolicy interface, and reconfigure their brokers accordingly. Since the TopicActionsPolicy contains a superset of the existing information used by the deprecated policies such reimplementation should be trivial.

BrokerState brokerState();

 * A policy that is enforced on topic creation, alteration and deletion,
 * and for the deletion of messages from a topic.
 * An implementation of this policy can be configured on a broker via the
 * {@code} broker config.
 * When this is configured the named class will be instantiated reflectively
 * using its nullary constructor and will then pass the broker configs to
 * its <code>configure()</code> method. During broker shutdown, the
 * <code>close()</code> method will be invoked so that resources can be
 * released (if necessary).
interface TopicManagementPolicy  extends Configurable, AutoCloseable {
    static interface AbstractRequestMetadata {
         * The topic the action is being performed upon.
        public String topic();
         * The authenticated principal making the request.
        public KafkaPrincipal principal();
    static interface CreateTopicRequest extends AbstractRequestMetadata {
         * The requested state of the topic to be created.
        public RequestedTopicState requestedState();
     * Validate the given request to create a topic
     * and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the request does not satisfy this policy.
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message.
     * Note that validation failure only affects the relevant topic,
     * other topics in the request will still be processed.
     * @param requestMetadata the request parameters for the provided topic.
     * @param clusterState the current state of the cluster
     * @throws PolicyViolationException if the request parameters do not satisfy this policy.
    void validateCreateTopic(CreateTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;
    static interface AlterTopicRequest extends AbstractRequestMetadata {
         * The state the topic will have after the alteration.
        public RequestedTopicState requestedState();
     * Validate the given request to alter an existing topic
     * and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the request does not satisfy this policy.
     * The given {@code clusterState} can be used to discover the current state of the topic to be modified.
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message.
     * Note that validation failure only affects the relevant topic,
     * other topics in the request will still be processed.
     * @param requestMetadata the request parameters for the provided topic.
     * @param clusterState the current state of the cluster
     * @throws PolicyViolationException if the request parameters do not satisfy this policy.
    void validateAlterTopic(AlterTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;
     * Parameters for a request to delete the given topic.
    static interface DeleteTopicRequest extends AbstractRequestMetadata {
     * Validate the given request to delete a topic
     * and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the request does not satisfy this policy.
     * The given {@code clusterState} can be used to discover the current state of the topic to be deleted.
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message.
     * Note that validation failure only affects the relevant topic,
     * other topics in the request will still be processed.
     * @param requestMetadata the request parameters for the provided topic.
     * @param clusterState the current state of the cluster
     * @throws PolicyViolationException if the request parameters do not satisfy this policy.
    void validateDeleteTopic(DeleteTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;
     * Parameters for a request to delete records from the topic.
    static interface DeleteRecordsRequest extends AbstractRequestMetadata {
         * Returns a map of topic partitions and the corresponding offset of the last message
         * to be retained. Messages before this offset will be deleted.
         * Partitions which won't have messages deleted won't be present in the map.
        Map<Integer, Long> deletedMessageOffsets();
     * Validate the given request to delete records from a topic
     * and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the request does not satisfy this policy.
     * The given {@code clusterState} can be used to discover the current state of the topic to have records deleted.
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message.
     * Note that validation failure only affects the relevant topic,
     * other topics in the request will still be processed.
     * @param requestMetadata the request parameters for the provided topic.
     * @param clusterState the current state of the cluster
     * @throws PolicyViolationException if the request parameters do not satisfy this policy.
    void validateDeleteRecords(DeleteRecordsRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;

 * Represents the state of a broker
interface BrokerState {

     * The broker config.
    Map<String,String> configs();

interface RequestedBrokerState extends BrokerState {

     * The topic config as it will be if the request is successful.
     * This is effectively the same as the value of {@code configs}
     * after the following computation:
     * <pre><code>
     *   Map<String, String> configs = currentState.configs();
     *   configs.putAll(requestedState.requestedConfigs();
     * </code></pre>
    Map<String,String> configs();

     * The broker configs present in the request.
    Map<String,String> requestedConfigs();

 * A policy that is enforced on broker alteration.
 * An implementation of this policy can be configured on a broker via the
 * {@code} broker config.
 * When this is configured the named class will be instantiated reflectively
 * using its nullary constructor and will then pass the broker configs to
 * its <code>configure()</code> method. During broker shutdown, the
 * <code>close()</code> method will be invoked so that resources can be
 * released (if necessary).
 * TODO: Fully define the lifecycle since the policy is configured by broker config which changes, so a means of reconfiguration is required.
interface BrokerManagementPolicy extends Configurable, AutoCloseable {
    static interface AbstractRequestMetadata {
         * The id of the broker the action is being performed upon.
         * This is always the same as the id of the broker in which the 
         * broker management policy is executing.
        public int brokerId();
         * The principal making the request.
        public KafkaPrincipal principal();
    static interface AlterBrokerRequest extends AbstractRequestMetadata {
         * The requested state of the broker to be altered.
        public RequestedBrokerState requestedState();

     * Validate the given request to alter a broker
     * and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the request does not satisfy this policy.
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message.
     * Note that validation failure only affects the relevant broker,
     * other topics in the request will still be processed.
     * @param requestMetadata the request parameters for the provided broker.
     * @param clusterState the current state of the cluster
     * @throws PolicyViolationException if the request parameters do not satisfy this policy.
    void validateAlterBroker(AlterBrokerRequest requestMetadata, 
                             ClusterState clusterState) 
            throws PolicyViolationException;

The TopicManagementPolicy will be applied:

  • On topic creation, i.e. when processing a CreateTopicsRequest
  • On topic modification
    • Change in topic config, ie. when processing AlterConfigsRequest, for topic configs (this change done as part of this KIP).
    • Adding partitions to topics, i.e. when processing a CreatePartitionsRequest (see KIP-195, but this change done as part of this KIP)
    • Reassigning partitions to brokers, and/or changing the replication factor when processing ReassignPartitionsRequest (as part of KIP-179)
  • On topic deletion, i.e. when processing a DeleteTopicsRequest (this change done as part of this KIP).
  • On message deletion, i.e. when processing a DeleteRecordsRequest (this change done as part of this KIP).

The BrokerManagementPolicy will be applied:

  • On broker startup
    • This is to ensure that brokers start in a valid state; without this it would be possible for a later alter broker request to be denied even though the request itself was not the cause of the policy violation.
  • On broker modification, ie. when processing a AlterConfigsRequest for broker configs.

Deprecate existing policies

The existing CreateTopicPolicy and AlterConfigPolicy will be deprecated, but will continue to be applied when they are configured.

Using or will result in an deprecation warning in the broker logs.

It will be a configuration-time error if both and are used at the same time, or both and are used at the same time.

Internally, an adapter implementation of TopicManagementPolicy will be used when CreateTopicPolicy and AlterConfigPolicy are configured, so policy use sites won't be unnecessarily complicated.

If, in the future, AdminClient.alterConfigs()/AlterConfigsRequest is changed to support changing broker configs a separate policy interface can be applied to such changes.

Add new versions of DeleteTopicsRequest and DeleteTopicsResponse

The DELETE_TOPICS protocol have a 3rd version added (version 2). The DeleteTopicsRequest will get a validate_only flag. When this is set the request will be validated for correctness, including that it satisfies the  TopicManagementPolicy.validateDeleteTopic() method, but the topic won't actually be deleted.

No Format
DeleteTopics Request (Version: 2) => [topics] timeout validate_only
  topics => STRING
  timeout => INT32
  validate_only => BOOLEAN

The DeleteTopicsResponse will get the ability to include error messages in addition to error codes:

No Format
DeleteTopics Response (Version: 2) => throttle_time_ms [topic_error_codes] 
  throttle_time_ms => INT32
  topic_error_codes => topic error_code error_message
    topic => STRING
    error_code => INT16
    error_message => NULLABLE_STRING

Old versions of the DeleteTopicsResponse will use a UNEXPECTED_SERVER_ERROR error_code instead of POLICY_VIOLATION so as to not break clients.

The documentation for AdminClient.deleteTopics() will be updated mention the possibility of PolicyViolationException from the DeleteTopicsResult methods.

Add new versions of DeleteRecordsRequest and DeleteRecordsResponse

The DELETE_RECORDS protocol have a 2nd version added (version 1). The DeleteRecordsRequest will get a validate_only flag. When this is set the request will be validated for correctness, including that it satisfies the  TopicManagementPolicy.validateDeleteRecords() method, but no records will be deleted.

No Format
DeleteRecords Request (Version: 1) => [topics] timeout validate_only
  topics => topic [partitions] 
    topic => STRING
    partitions => partition offset 
      partition => INT32
      offset => INT64
  timeout => INT32
  validate_only => BOOLEAN

The DeleteRecordsResponse will get the ability to include error messages in addition to error codes:

No Format
DeleteRecords Response (Version: 0) => throttle_time_ms [topics] 
  throttle_time_ms => INT32
  topics => topic [partitions] 
    topic => STRING
    partitions => partition low_watermark error_code error_message
      partition => INT32
      low_watermark => INT64
      error_code => INT16
      error_message => NULLABLE_STRING

Existing versions of the DeleteRecordsResponse will use a UNEXPECTED_SERVER_ERROR error_code instead of POLICY_VIOLATION so as to not break clients.

The documentation for AdminClient.deleteRecords() (being added by KIP-204) will be updated mention the possibility of PolicyViolationException from the DeleteRecordsResult methods.

Compatibility, Deprecation, and Migration Plan

Existing users will have to reimplement their policies in terms of the new TopicManagementPolicy interface, and reconfigure their brokers accordingly. Since the TopicManagementPolicy contains a superset of the existing information used by the deprecated policies such reimplementation should be trivial.

The deprecated policy interfaces and configuration keys will be removed in a future Kafka version. If this KIP is accepted for Kafka 1.1.0 this removal could happen in Kafka 2.0.0 or a later major release.

This KIP proposes to retrospectively apply policies to two APIs (delete topics and delete records) exposed via the network protocol. Existing clients might not be expecting a POLICY_VIOLATION error code in the responses. To mitigate this, only the new version of these network protocols will actually return POLICY_VIOLATION. If a client is using an old version of either of these protocols, the policy violation will be returned an UNEXPECTED_SERVER_ERROR, and a message logged to explain that the error is in fact a policy violationThe deprecated policy interfaces and configuration keys will be removed in a future Kafka version. If this KIP is accepted for Kafka 1.1.0 this removal could happen in Kafka 1.2.0 or a later release.

Rejected Alternatives

The objectives of this KIP could be achieved without deprecating the existing policy classes, but that:

  • incurs ongoing maintenance and testing costs on the project for not overall benefit
  • If two policies were in force it would be more confusing to users when a request was rejected (which policy rejected it?) possibly exacerbated if users didn't know two policies were in force.
  • If it were possible to have two policies in force administrators have not been relieved of the burden of maintaining two policies in sync.

The proposed TopicActionsPolicy doesn't have to cover the topic deletion case: That could still be handled by a separate policy, but it is desirable to have a single policy to cover the whole lifecycle of a topic, and for the same information to be made available about a topic being deleted as about a topic being modified.

Having separate policy interfaces was considered, but rejected because there would need to be several of them, making it harder for people to discover, understand, implement and configure policies. It would also be easy for users to miss if a new policy interface was added.

Having separate policy interfaces for creation/modification and deletion and retaining the existing single-method-per-policy-interface design was considered, but rejected because it was a half way house between having multiple policies and having a single policyThe proposed TopicActionsPolicy doesn't cover the use case of records being deleted from a topic. This is not the same as the modification of a topic, and would require a different policy interface. It might be appropriate to use the same topic state in such a policy interface, however.