Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: avoid null principle

...

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: https://issues.apache.org/jira/browse/KAFKA-5693

...

New versions of existing network protocol protocol DeleteTopicsRequest and and DeleteRecordsRequest will be added, to add a validate_only flag.

New versions of existing network protocol protocol DeleteTopicsResponse and and DeleteRecordsResponse will be added to include an error message.

...

Code Block
languagejava
linenumberstrue
/**
 * Represents the state of a topic either before, or as a result of,
 * an administrative request affecting a topic.
 */
interface TopicState {
    /**
     * The number of partitions of the topic.
     */
    int numPartitions();
 
    /**
     * The replication factor of the topic. More precisely, the number of assigned replicas for partition 0.
     * // TODO what about during reassignment
     */
    Shortshort replicationFactor();
 
    /**
     * A map of the replica assignments of the topic, with partition ids as keys and
     * the assigned brokers as the corresponding values.
     * // TODO what about during reassignment
     */
    Map<Integer, List<Integer>> replicasAssignments();
 
    /**
     * The topic config.
     */
    Map<String,String> configs();
 
    /**
     * Returns whether the topic is marked for deletion.
     */
    boolean markedForDeletion();
 
    /**
     * Returns whether the topic is an internal topic.
     */
    boolean internal();
 
}

 
/**
 * Represents Thethe currentrequested state of thea topics in the cluster, before the request takes effect. topic.
 */
interface RequestedTopicState extends ClusterStateTopicState {
    /**
     * True Returnsif the current state of the given topic, or null if the topic does not exist. {@link TopicState#replicasAssignments()}
     * in this request we generated by the broker, false if
     */
 they were explicitly requested TopicStateby topicState(String topicNamethe client.
     */
    boolean generatedReplicaAssignments();

    /**
     * The Returnstopic allconfig theas topicsit inwill thebe cluster,if includingthe internalrequest topicsis ifsuccessful.
     * This {@codeis includeInternal}effectively isthe true,same andas includingthe thosevalue markedof for{@code deletionconfigs}
     * ifafter {@codethe includeMarkedForDeletion} is true.following computation:
     */ <pre><code>
     * Set<String> topics(boolean includeInternalMap<String, String> configs boolean includeMarkedForDeletion= currentState.configs();

     /***   configs.putAll(requestedState.requestedConfigs();
     * The number of brokers in the cluster.</code></pre>
     */
     */@Override
    intMap<String,String> clusterSizeconfigs();
}



    /**
 * A policy that is* enforcedThe ontopic topicconfigs creation,present alterationin andthe deletion,request.
 * and for the deletion*/
 of messages from a topic.Map<String,String> requestedConfigs();
}
 
/*
* *The Ancurrent implementationstate of thisthe policytopics canin bethe configuredcluster, onbefore athe brokerrequest viatakes the
effect. */
interface ClusterState {@code topic.management.policy.class.name} broker config.
 * When this is configured the named class will be instantiated reflectively
 * using its nullary constructor and will then pass the broker configs to
 * its <code>configure()</code> method. During broker shutdown, the
 * <code>close()</code> method will be invoked so that resources can be
 * released (if necessary).
 *
 */
interface TopicManagementPolicy  extends Configurable, AutoCloseable {

    static interface AbstractRequestMetadata {

        /**
         * The topic the action is being performed upon.
         */
        public String topic();

    
    /**
     * Returns the current state of the given topic, or null if the topic does not exist.
     */
    TopicState topicState(String topicName);
 
    /**
     * Returns all the topics in the cluster, including internal topics if
     * {@code includeInternal} is true, and including those marked for deletion
     * if {@code includeMarkedForDeletion} is true.
     */
    Set<String> topics(boolean includeInternal, boolean includeMarkedForDeletion);
 
    /**
     * The number of brokers in the cluster.
     */
    int clusterSize();

    /**
     *  Returns the *current Thestate authenticated principalof making the request,broker orin nullwhich if the sessionmethod is not authenticatedcalled.
         */
        public KafkaPrincipal principalBrokerState brokerState();
    
}
 
/**
 * A policy staticthat interfaceis CreateTopicRequestenforced extendson AbstractRequestMetadatatopic {
creation, alteration and deletion,
 * and for the /**
deletion of messages from a topic.
 *
   * TheAn requested stateimplementation of thethis topicpolicy tocan be created.
         */
        public TopicState requestedState();
    }

    /**
     * Validate the given request to create a topic
     * and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the request does not satisfy this policy.
     *
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message.
 configured on a broker via the
 * {@code topic.management.policy.class.name} broker config.
 * When this is configured the named class will be instantiated reflectively
 * using its nullary constructor and will then pass the broker configs to
 * its <code>configure()</code> method. During broker shutdown, the
 * <code>close()</code> method will be invoked so that resources can be
 * released (if necessary).
 *
 */
interface TopicManagementPolicy  extends Configurable, AutoCloseable {
 
    static interface AbstractRequestMetadata {
 
        /**
         * NoteThe thattopic validationthe failureaction onlyis affectsbeing the relevant topic,performed upon.
     * other topics in the*/
 request will still be processed.
   public String topic();
 *
     * @param requestMetadata the/**
 request parameters for the provided topic.
   * The *authenticated @paramprincipal clusterStatemaking the currentrequest.
 state of the cluster
     */
 @throws PolicyViolationException if the request parameters do notpublic satisfy this policy.
KafkaPrincipal principal();
    }
 
 */
    voidstatic interface validateCreateTopic(CreateTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;

    static interface AlterTopicRequest extends AbstractRequestMetadata {
        /**
         * The requested state of the topic willto have after the alterationbe created.
         */
        public TopicStateRequestedTopicState requestedState();
    }
 
    /**
     * Validate the given request to altercreate an existinga topic
     * and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the request does not satisfy this policy.
     *
     * TheClients givenwill {@codereceive clusterState} can be used to discoverthe POLICY_VIOLATION error code along with the current state of the topic to be modified.
     *
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message.
     * exception's message.
     * Note that validation failure only affects the relevant topic,
     * other topics in the request will still be processed.
     *
     * @param requestMetadata the request parameters for the provided topic.
     * @param clusterState the current state of the cluster
     * @throws PolicyViolationException if the request parameters do not satisfy this policy.
     */
    void validateAlterTopicvalidateCreateTopic(AlterTopicRequestCreateTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;
 
    /**
static interface AlterTopicRequest extends AbstractRequestMetadata *{
 Parameters for a request to delete the given topic.
 /**
         * The state the topic will have after the alteration.
         */
    static  interface DeleteTopicRequest extendspublic AbstractRequestMetadata {RequestedTopicState requestedState();
    }
 
    /**
     * Validate the given request to deletealter an aexisting topic
     * and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the request does not satisfy this policy.
     *
     * The given {@code clusterState} can be used to discover the current state of the topic to be deletedmodified.
     *
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message.
     * Note that validation failure only affects the relevant topic,
     * other topics in the request will still be processed.
     *
     * @param requestMetadata the request parameters for the provided topic.
     * @param clusterState the current state of the cluster
     * @throws PolicyViolationException if the request parameters do not satisfy this policy.
     */
    void validateDeleteTopicvalidateAlterTopic(DeleteTopicRequestAlterTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;
 
    /**
     * Parameters for a request to delete recordsthe from thegiven topic.
     */
    static interface DeleteRecordsRequestDeleteTopicRequest extends AbstractRequestMetadata {

    }
 
    /**
     * Validate the given *request Returnsto delete a map of topic
 partitions and the corresponding offset* ofand thethrow last message
    a <code>PolicyViolationException</code> with a suitable error
     * tomessage beif retained.the Messagesrequest beforedoes thisnot offsetsatisfy willthis be deletedpolicy.
     *
     * PartitionsThe which won't have messages deleted won't be present in the map.
    given {@code clusterState} can be used to discover the current state of the topic to be deleted.
     */
     * Clients will Map<Integer,receive Long> deletedMessageOffsets();
    }

    /**the POLICY_VIOLATION error code along with the exception's message.
     * ValidateNote thethat givenvalidation requestfailure toonly deleteaffects recordsthe fromrelevant a topic,
     * andother throwtopics ain <code>PolicyViolationException</code> with a suitable errorthe request will still be processed.
     *
     * message@param ifrequestMetadata the request doesparameters notfor satisfythe thisprovided policytopic.
     *
 @param clusterState the current *state Theof giventhe {@codecluster
 clusterState} can be used to* discover@throws thePolicyViolationException currentif statethe ofrequest theparameters topicdo tonot havesatisfy recordsthis deletedpolicy.
     */
    void * Clients will receive the POLICY_VIOLATION error code along with the exception's message.validateDeleteTopic(DeleteTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;
 
    /**
     * Parameters for Notea thatrequest validationto failuredelete onlyrecords affectsfrom the relevant topic,.
     */
  other topics instatic theinterface requestDeleteRecordsRequest willextends stillAbstractRequestMetadata be{
 processed.
        /**
     * @param requestMetadata the request* parametersReturns fora themap providedof topic.
 partitions and the corresponding *offset @paramof clusterStatethe thelast currentmessage
 state of the cluster
     * @throwsto PolicyViolationExceptionbe ifretained. theMessages requestbefore parametersthis dooffset notwill satisfybe this policydeleted.
     */
    void* validateDeleteRecords(DeleteRecordsRequest requestMetadata, ClusterState clusterState) throws PolicyViolationExceptionPartitions which won't have messages deleted won't be present in the map.
         */
        Map<Integer, Long> deletedMessageOffsets();
    }
 
    /**
     * Validate the given request to delete records from a topic
     * and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the request does not satisfy this policy.
     *
     * The given {@code clusterState} can be used to discover the current state of the topic to have records deleted.
     *
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message.
     * Note that validation failure only affects the relevant topic,
     * other topics in the request will still be processed.
     *
     * @param requestMetadata the request parameters for the provided topic.
     * @param clusterState the current state of the cluster
     * @throws PolicyViolationException if the request parameters do not satisfy this policy.
     */
    void validateDeleteRecords(DeleteRecordsRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;
}


/**
 * Represents the state of a broker
 */ 
interface BrokerState {

    /**
     * The broker config.
     */
    Map<String,String> configs();
}

interface RequestedBrokerState extends BrokerState {

    /**
     * The topic config as it will be if the request is successful.
     * This is effectively the same as the value of {@code configs}
     * after the following computation:
     * <pre><code>
     *   Map<String, String> configs = currentState.configs();
     *   configs.putAll(requestedState.requestedConfigs();
     * </code></pre>
     */    
    @Override
    Map<String,String> configs();


    /**
     * The broker configs present in the request.
     */
    Map<String,String> requestedConfigs();
}


/**
 * A policy that is enforced on broker alteration.
 *
 * An implementation of this policy can be configured on a broker via the
 * {@code broker.management.policy.class.name} broker config.
 * When this is configured the named class will be instantiated reflectively
 * using its nullary constructor and will then pass the broker configs to
 * its <code>configure()</code> method. During broker shutdown, the
 * <code>close()</code> method will be invoked so that resources can be
 * released (if necessary).
 *
 * TODO: Fully define the lifecycle since the policy is configured by broker config which changes, so a means of reconfiguration is required.
 */
interface BrokerManagementPolicy extends Configurable, AutoCloseable {
    static interface AbstractRequestMetadata {
 
        /**
         * The id of the broker the action is being performed upon.
         * This is always the same as the id of the broker in which the 
         * broker management policy is executing.
         */
        public int brokerId();
 
        /**
         * The principal making the request.
         */
        public KafkaPrincipal principal();
    }
 
 
    static interface AlterBrokerRequest extends AbstractRequestMetadata {
        /**
         * The requested state of the broker to be altered.
         */
        public RequestedBrokerState requestedState();
    }

    /**
     * Validate the given request to alter a broker
     * and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the request does not satisfy this policy.
     *
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message.
     * Note that validation failure only affects the relevant broker,
     * other topics in the request will still be processed.
     *
     * @param requestMetadata the request parameters for the provided broker.
     * @param clusterState the current state of the cluster
     * @throws PolicyViolationException if the request parameters do not satisfy this policy.
     */
    void validateAlterBroker(AlterBrokerRequest requestMetadata, 
                             ClusterState clusterState) 
            throws PolicyViolationException;
}

The TopicManagementPolicy will be applied:

  • On topic creation, i.e. when processing a CreateTopicsRequest
  • On topic modification
    • Change in topic config, ie. when processing AlterConfigsRequest, for topic configs (this change done as part of this KIP).
    • Adding partitions to topics, i.e. when processing a CreatePartitionsRequest (see KIP-195, but (see KIP-195, but this change done as part of this KIP)
    • Reassigning partitions to brokers, and/or changing the replication factor when processing ReassignPartitionsRequest (as part of KIP-179)
  • On topic deletion, i.e. when processing a DeleteTopicsRequest (this change done as part of this KIP).
  • On message deletion, i.e. when processing a DeleteRecordsRequest (this change done as part of this KIP)
  • Reassigning partitions to brokers, and/or changing the replication factor when processing ReassignPartitionsRequest (see KIP-179)
  • On topic deletion, i.e. when processing a DeleteTopicsRequest.
  • .

The BrokerManagementPolicy will be applied:

  • On broker startup
    • This is to ensure that brokers start in a valid state; without this it would be possible for a later alter broker request to be denied even though the request itself was not the cause of the policy violation.
  • On broker modification, ie. when processing a AlterConfigsRequest for broker configsOn message deletion, i.e. when processing a DeleteRecordsRequest.

Deprecate existing policies

...

It will be a configuration-time error if both create.topic.policy.class.name and topic.management.policy.class.name are used at the same time, or both alter.config.policy.class.name and topic.management.policy.class.name are used at the same time.

...

No Format
DeleteTopics Response (Version: 2) => throttle_time_ms [topic_error_codes] 
  throttle_time_ms => INT32
  topic_error_codes => topic error_code error_message
    topic => STRING
    error_code => INT16
    error_message => NULLABLE_STRING

Existing Old versions of the DeleteTopics resonse will be able to have the DeleteTopicsResponse will use a UNEXPECTED_SERVER_ERROR error_codes with value  instead of POLICY_VIOLATION so as to not break clients.

The documentation for AdminClient.deleteTopics() will be updated mention the possibility of PolicyViolationException from the DeleteTopicsResult methods.

...

No Format
DeleteRecords Response (Version: 0) => throttle_time_ms [topics] 
  throttle_time_ms => INT32
  topics => topic [partitions] 
    topic => STRING
    partitions => partition low_watermark error_code error_message
      partition => INT32
      low_watermark => INT64
      error_code => INT16
      error_message => NULLABLE_STRING

Existing versions of the DeleteRecords resonse will be able to have the DeleteRecordsResponse will use a UNEXPECTED_SERVER_ERROR error_codes with value  instead of POLICY_VIOLATION so as to not break clients.

The documentation for AdminClient.deleteRecords() (being added by KIP-204) will be updated mention the possibility of PolicyViolationException from the DeleteRecordsResult methods.

...

Having separate policy interfaces for creation/modification and deletion and retaining the existing single-method-per-policy-interface design was considered, but rejected because it was a half way house between having multiple policies and having a single policy.