Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: avoid null principle

...

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: https://issues.apache.org/jira/browse/KAFKA-5693

...

New versions of existing network protocol protocol DeleteTopicsRequest and and DeleteRecordsRequest will be added, to add a validate_only flag.

New versions of existing network protocol protocol DeleteTopicsResponse and and DeleteRecordsResponse will be added to include an error message.

...

Code Block
languagejava
linenumberstrue
/**
 * Represents the state of a topic either before, or as a result of,
 * an administrative request affecting a topic.
 */
interface TopicState {
    /**
     * The number of partitions of the topic.
     */
    int numPartitions();
 
    /**
     * The replication factor of the topic. More precisely, the number of assigned replicas for partition 0.
     * // TODO what about during reassignment
     */
    Shortshort replicationFactor();
 
    /**
     * A map of the replica assignments of the topic, with partition ids as keys and
     * the assigned brokers as the corresponding values.
     * // TODO what about during reassignment
     */
    Map<Integer, List<Integer>> replicasAssignments();
 
    /**
     * The topic config.
     */
    Map<String,String> configs();
 
    /**
     * Returns whether the topic is marked for deletion.
     */
    boolean markedForDeletion();
 
    /**
     * Returns whether the topic is an internal topic.
     */
    boolean internal();
 
}

 
/**
 * Represents Thethe currentrequested state of thea topics in the cluster, before the request takes effect. topic.
 */
interface RequestedTopicState extends ClusterStateTopicState {
    /**
     * True Returnsif the current state of the given topic, or null if the topic does not exist. {@link TopicState#replicasAssignments()}
     * in this request we generated by the broker, false if
     */
 they were explicitly requested TopicStateby topicState(String topicNamethe client.
     */
    boolean generatedReplicaAssignments();

    /**
     * The Returnstopic allconfig theas topicsit inwill thebe cluster,if includingthe internalrequest topicsis ifsuccessful.
     * This {@codeis includeInternal}effectively isthe true,same andas includingthe thosevalue markedof for{@code deletionconfigs}
     * ifafter {@codethe includeMarkedForDeletion} is true.following computation:
     */ <pre><code>
     * Set<String> topics(boolean includeInternalMap<String, String> configs boolean includeMarkedForDeletion= currentState.configs();

     /***   configs.putAll(requestedState.requestedConfigs();
     * The number of brokers in the cluster.</code></pre>
     */
     */@Override
    intMap<String,String> clusterSizeconfigs();
}



    /**
 * A policy that is* enforcedThe ontopic topicconfigs creation,present alterationin andthe deletion,request.
 * and for the deletion*/
 of messages from a topic.Map<String,String> requestedConfigs();
}
 
/*
* *The Ancurrent implementationstate of thisthe policytopics canin bethe configuredcluster, onbefore athe brokerrequest viatakes the
effect. */
interface ClusterState {@code topic.management.policy.class.name} broker config.
 * When this is configured the named class will be instantiated reflectively
 * using its nullary constructor and will then pass the broker configs to
 * its <code>configure()</code> method. During broker shutdown, the
 * <code>close()</code> method will be invoked so that resources can be
 * released (if necessary).
 *
 */
interface TopicManagementPolicy  extends Configurable, AutoCloseable {

    static interface AbstractRequestMetadata {

        /**
         * The topic the action is being performed upon.
         */
        public String topic();

    
    /**
     * Returns the current state of the given topic, or null if the topic does not exist.
     */
    TopicState topicState(String topicName);
 
    /**
     * Returns all the topics in the cluster, including internal topics if
     * {@code includeInternal} is true, and including those marked for deletion
     * if {@code includeMarkedForDeletion} is true.
     */
    Set<String> topics(boolean includeInternal, boolean includeMarkedForDeletion);
 
    /**
     * The number of brokers in the cluster.
     */
    int clusterSize();

    /**
     *  Returns the *current Thestate authenticated principalof making the request,broker orin nullwhich if the sessionmethod is not authenticatedcalled.
         */
        public KafkaPrincipal principalBrokerState brokerState();
    
}
 
/**
 * A policy staticthat interfaceis CreateTopicRequestenforced extendson AbstractRequestMetadatatopic {
creation, alteration and deletion,
 * and for the /**
deletion of messages from a topic.
 *
   * TheAn requested stateimplementation of thethis topicpolicy tocan be created.
         */
        public TopicState requestedState();
    }

    /**
     * Validate the given request to create a topic
     * and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the request does not satisfy this policy.
     *
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message.
 configured on a broker via the
 * {@code topic.management.policy.class.name} broker config.
 * When this is configured the named class will be instantiated reflectively
 * using its nullary constructor and will then pass the broker configs to
 * its <code>configure()</code> method. During broker shutdown, the
 * <code>close()</code> method will be invoked so that resources can be
 * released (if necessary).
 *
 */
interface TopicManagementPolicy  extends Configurable, AutoCloseable {
 
    static interface AbstractRequestMetadata {
 
        /**
         * NoteThe thattopic validationthe failureaction onlyis affectsbeing the relevant topic,performed upon.
     * other topics in the*/
 request will still be processed.
   public String topic();
 *
     * @param requestMetadata the/**
 request parameters for the provided topic.
   * The *authenticated @paramprincipal clusterStatemaking the currentrequest.
 state of the cluster
     */
 @throws PolicyViolationException if the request parameters do notpublic satisfy this policy.
KafkaPrincipal principal();
    }
 
 */
    voidstatic interface validateCreateTopic(CreateTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;

    static interface AlterTopicRequest extends AbstractRequestMetadata {
        /**
         * The requested state of the topic willto have after the alterationbe created.
         */
        public TopicStateRequestedTopicState requestedState();
    }
 
    /**
     * Validate the given request to altercreate an existinga topic
     * and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the request does not satisfy this policy.
     *
     * TheClients givenwill {@codereceive clusterState} can be used to discoverthe POLICY_VIOLATION error code along with the current state of the topic to be modified.
     *
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message.
     * exception's message.
     * Note that validation failure only affects the relevant topic,
     * other topics in the request will still be processed.
     *
     * @param requestMetadata the request parameters for the provided topic.
     * @param clusterState the current state of the cluster
     * @throws PolicyViolationException if the request parameters do not satisfy this policy.
     */
    void validateAlterTopicvalidateCreateTopic(AlterTopicRequestCreateTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;
 
    /**
static interface AlterTopicRequest extends AbstractRequestMetadata *{
 Parameters for a request to delete the given topic.
 /**
         * The state the topic will have after the alteration.
         */
    static  interface DeleteTopicRequest extendspublic AbstractRequestMetadata {RequestedTopicState requestedState();
    }
 
    /**
     * Validate the given request to deletealter an aexisting topic
     * and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the request does not satisfy this policy.
     *
     * The given {@code clusterState} can be used to discover the current state of the topic to be deletedmodified.
     *
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message.
     * Note that validation failure only affects the relevant topic,
     * other topics in the request will still be processed.
     *
     * @param requestMetadata the request parameters for the provided topic.
     * @param clusterState the current state of the cluster
     * @throws PolicyViolationException if the request parameters do not satisfy this policy.
     */
    void validateDeleteTopicvalidateAlterTopic(DeleteTopicRequestAlterTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;
 
    /**
     * Parameters for a request to delete recordsthe from thegiven topic.
     */
    static interface DeleteRecordsRequestDeleteTopicRequest extends AbstractRequestMetadata {

    }
 
    /**
     * Validate the given *request Returnsto delete a map of topic
 partitions and the corresponding offset* ofand thethrow last message
    a <code>PolicyViolationException</code> with a suitable error
     * tomessage beif retained.the Messagesrequest beforedoes thisnot offsetsatisfy willthis be deletedpolicy.
     *
     * PartitionsThe which won't have messages deleted won't be present in the map.
    given {@code clusterState} can be used to discover the current state of the topic to be deleted.
     */
     * Clients will Map<Integer,receive Long> deletedMessageOffsets();
    }

    /**the POLICY_VIOLATION error code along with the exception's message.
     * ValidateNote thethat givenvalidation requestfailure toonly deleteaffects recordsthe fromrelevant a topic,
     * andother throwtopics ain <code>PolicyViolationException</code> with a suitable errorthe request will still be processed.
     *
     * message@param ifrequestMetadata the request doesparameters notfor satisfythe thisprovided policytopic.
     *
 @param clusterState the current *state Theof giventhe {@codecluster
 clusterState} can be used to* discover@throws thePolicyViolationException currentif statethe ofrequest theparameters topicdo tonot havesatisfy recordsthis deletedpolicy.
     */
    void * Clients will receive the POLICY_VIOLATION error code along with the exception's message.validateDeleteTopic(DeleteTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;
 
    /**
     * Parameters for Notea thatrequest validationto failuredelete onlyrecords affectsfrom the relevant topic,.
     */
  other topics instatic theinterface requestDeleteRecordsRequest willextends stillAbstractRequestMetadata be{
 processed.
        /**
     * @param requestMetadata the request* parametersReturns fora themap providedof topic.
 partitions and the corresponding *offset @paramof clusterStatethe thelast currentmessage
 state of the cluster
     * @throwsto PolicyViolationExceptionbe ifretained. theMessages requestbefore parametersthis dooffset notwill satisfybe this policydeleted.
     */
    void* validateDeleteRecords(DeleteRecordsRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;
}

The TopicManagementPolicy will be applied:

Partitions which won't have messages deleted won't be present in the map.
         */
        Map<Integer, Long> deletedMessageOffsets();
    }
 
    /**
     * Validate the given request to delete records from a topic
     * and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the request does not satisfy this policy.
     *
     * The given {@code clusterState} can be used to discover the current state of the topic to have records deleted.
     *
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message.
     * Note that validation failure only affects the relevant topic,
     * other topics in the request will still be processed.
     *
     * @param requestMetadata the request parameters for the provided topic.
     * @param clusterState the current state of the cluster
     * @throws PolicyViolationException if the request parameters do not satisfy this policy.
     */
    void validateDeleteRecords(DeleteRecordsRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;
}


/**
 * Represents the state of a broker
 */ 
interface BrokerState {

    /**
     * The broker config.
     */
    Map<String,String> configs();
}

interface RequestedBrokerState extends BrokerState {

    /**
     * The topic config as it will be if the request is successful.
     * This is effectively the same as the value of {@code configs}
     * after the following computation:
     * <pre><code>
     *   Map<String, String> configs = currentState.configs();
     *   configs.putAll(requestedState.requestedConfigs();
     * </code></pre>
     */    
    @Override
    Map<String,String> configs();


    /**
     * The broker configs present in the request.
     */
    Map<String,String> requestedConfigs();
}


/**
 * A policy that is enforced on broker alteration.
 *
 * An implementation of this policy can be configured on a broker via the
 * {@code broker.management.policy.class.name} broker config.
 * When this is configured the named class will be instantiated reflectively
 * using its nullary constructor and will then pass the broker configs to
 * its <code>configure()</code> method. During broker shutdown, the
 * <code>close()</code> method will be invoked so that resources can be
 * released (if necessary).
 *
 * TODO: Fully define the lifecycle since the policy is configured by broker config which changes, so a means of reconfiguration is required.
 */
interface BrokerManagementPolicy extends Configurable, AutoCloseable {
    static interface AbstractRequestMetadata {
 
        /**
         * The id of the broker the action is being performed upon.
         * This is always the same as the id of the broker in which the 
         * broker management policy is executing.
         */
        public int brokerId();
 
        /**
         * The principal making the request.
         */
        public KafkaPrincipal principal();
    }
 
 
    static interface AlterBrokerRequest extends AbstractRequestMetadata {
        /**
         * The requested state of the broker to be altered.
         */
        public RequestedBrokerState requestedState();
    }

    /**
     * Validate the given request to alter a broker
     * and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the request does not satisfy this policy.
     *
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message.
     * Note that validation failure only affects the relevant broker,
     * other topics in the request will still be processed.
     *
     * @param requestMetadata the request parameters for the provided broker.
     * @param clusterState the current state of the cluster
     * @throws PolicyViolationException if the request parameters do not satisfy this policy.
     */
    void validateAlterBroker(AlterBrokerRequest requestMetadata, 
                             ClusterState clusterState) 
            throws PolicyViolationException;
}

The TopicManagementPolicy will be applied:

  • On topic creation, i.e. when processing a CreateTopicsRequest
  • On topic modification
    • Change in topic config, ie. when processing AlterConfigsRequest, for topic configs (this change done as part of this KIP).
    • Adding partitions to topics, i.e. when processing a CreatePartitionsRequest (see KIP-195, but this change done as part of this KIP)
    • Reassigning partitions to brokers, and/or changing the replication factor when processing ReassignPartitionsRequest (as part of KIP-179)
  • On topic deletion, i.e. when processing a DeleteTopicsRequest (this change
  • On topic creation, i.e. when processing a CreateTopicsRequest
  • On topic modificationChange in topic config, ie. when processing AlterConfigsRequest, for topic configs (this change done as part of this KIP).
  • Adding partitions to topics, iOn message deletion, i.e. when processing a CreatePartitionsRequest (see KIP-195, but this change done as part of this KIP)
  • Reassigning partitions to brokers, and/or changing the replication factor when processing ReassignPartitionsRequest (see KIP-179)
  • On topic deletion, i.e. when processing a DeleteTopicsRequest.
  • DeleteRecordsRequest (this change done as part of this KIP).

The BrokerManagementPolicy will be applied:

  • On broker startup
    • This is to ensure that brokers start in a valid state; without this it would be possible for a later alter broker request to be denied even though the request itself was not the cause of the policy violation.
  • On broker modification, ie. when processing a AlterConfigsRequest for broker configsOn message deletion, i.e. when processing a DeleteRecordsRequest.

Deprecate existing policies

...

It will be a configuration-time error if both create.topic.policy.class.name and topic.management.policy.class.name are used at the same time, or both alter.config.policy.class.name and topic.management.policy.class.name are used at the same time.

...

No Format
DeleteTopics Response (Version: 2) => throttle_time_ms [topic_error_codes] 
  throttle_time_ms => INT32
  topic_error_codes => topic error_code error_message
    topic => STRING
    error_code => INT16
    error_message => NULLABLE_STRING

Existing Old versions of the DeleteTopics resonse will be able to have the DeleteTopicsResponse will use a UNEXPECTED_SERVER_ERROR error_codes with value  instead of POLICY_VIOLATION so as to not break clients.

The documentation for AdminClient.deleteTopics() will be updated mention the possibility of PolicyViolationException from the DeleteTopicsResult methods.

...

No Format
DeleteRecords Response (Version: 0) => throttle_time_ms [topics] 
  throttle_time_ms => INT32
  topics => topic [partitions] 
    topic => STRING
    partitions => partition low_watermark error_code error_message
      partition => INT32
      low_watermark => INT64
      error_code => INT16
      error_message => NULLABLE_STRING

Existing versions of the DeleteRecords resonse will be able to have error_codes with value POLICY_VIOLATION.

The documentation for AdminClient.deleteRecords() (being added by KIP-204) will be updated mention the possibility of PolicyViolationException from the DeleteRecordsResult methods.

Compatibility, Deprecation, and Migration Plan

Existing users will have to reimplement their policies in terms of the new TopicManagementPolicy interface, and reconfigure their brokers accordingly. Since the TopicManagementPolicy contains a superset of the existing information used by the deprecated policies such reimplementation should be trivial.

      error_message => NULLABLE_STRING

Existing versions of the DeleteRecordsResponse will use a UNEXPECTED_SERVER_ERROR error_code instead of POLICY_VIOLATION so as to not break clients.

The documentation for AdminClient.deleteRecords() (being added by KIP-204) will be updated mention the possibility of PolicyViolationException from the DeleteRecordsResult methods.

Compatibility, Deprecation, and Migration Plan

Existing users will have to reimplement their policies in terms of the new TopicManagementPolicy interface, and reconfigure their brokers accordingly. Since the TopicManagementPolicy contains a superset of the existing information used by the deprecated policies such reimplementation should be trivial.

The deprecated policy interfaces and configuration keys will be removed in a future Kafka version. If this KIP is accepted for Kafka 1.1.0 this removal could happen in Kafka 2.0.0 or a later major release.

This KIP proposes to retrospectively apply policies to two APIs (delete topics and delete records) exposed via the network protocol. Existing clients might not be expecting a POLICY_VIOLATION error code in the responses. To mitigate this, only the new version of these network protocols will actually return POLICY_VIOLATION. If a client is using an old version of either of these protocols, the policy violation will be returned an UNEXPECTED_SERVER_ERROR, and a message logged to explain that the error is in fact a policy violationThe deprecated policy interfaces and configuration keys will be removed in a future Kafka version. If this KIP is accepted for Kafka 1.1.0 this removal could happen in Kafka 2.0.0 or a later major release.

Rejected Alternatives

The objectives of this KIP could be achieved without deprecating the existing policy classes, but that:

...

Having separate policy interfaces for creation/modification and deletion and retaining the existing single-method-per-policy-interface design was considered, but rejected because it was a half way house between having multiple policies and having a single policy.