Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Extend to cover changing broker config

...

Code Block
languagejava
linenumberstrue
/**
 * Represents the state of a topic.
 */
interface TopicState {
    /**
     * The number of partitions of the topic.
     */
    int numPartitions();
 
    /**
     * The replication factor of the topic. More precisely, the number of assigned replicas for partition 0.
     * // TODO what about during reassignment
     */
    short replicationFactor();
 
    /**
     * A map of the replica assignments of the topic, with partition ids as keys and
     * the assigned brokers as the corresponding values.
     * // TODO what about during reassignment
     */
    Map<Integer, List<Integer>> replicasAssignments();
 
    /**
     * The topic config.
     */
    Map<String,String> configs();
 
    /**
     * Returns whether the topic is marked for deletion.
     */
    boolean markedForDeletion();
 
    /**
     * Returns whether the topic is an internal topic.
     */
    boolean internal();
 
}
 
/**
 * Represents the requested state of a topic.
 */
interface RequestedTopicState extends TopicState {
    /** 
     * True if the {@link TopicState#replicasAssignments()}
     * in this request we generated by the broker, false if 
     * they were explicitly requested by the client.
     */
    boolean generatedReplicaAssignments();
}

    /**
 The    current* stateThe oftopic theconfig topicsas init thewill cluster,be beforeif the request takesis effectsuccessful. */
interface ClusterState {
    /**
 This is effectively the *same Returnsas the current statevalue of the given topic, or null if the topic does not exist.{@code configs}
     * after the following computation:
     */
 <pre><code>
     *   Map<String, String> configs TopicState= topicState(String topicNamecurrentState.configs();

     /***   configs.putAll(requestedState.requestedConfigs();
     * Returns all the topics in the cluster, including internal topics if</code></pre>
     */
    @Override
     * {@code includeInternal} is true, and including those marked for deletion
     * if {@code includeMarkedForDeletion} is trueMap<String,String> configs();

    /**
     * The topic configs present in the request.
     */
    Set<String>Map<String,String> topics(boolean includeInternal, boolean includeMarkedForDeletionrequestedConfigs();
}
    
/**
 The current state of * The number of brokersthe topics in the cluster.
, before the request takes effect. */
interface ClusterState {
   int clusterSize();
}


/**
 *  A policy that* isReturns enforcedthe oncurrent topicstate creation,of alterationthe andgiven deletiontopic,
 *or andnull forif the deletiontopic ofdoes messages from a topicnot exist.
 *
 * An implementation of*/
 this policy can beTopicState configured on a broker via the
 * {@code topic.management.policy.class.name} broker config.
 * When this is configured the named class will be instantiated reflectively
 * using its nullary constructor and will then pass the broker configs to
 * its <code>configure()</code> method. During broker shutdown, the
 * <code>close()</code> method will be invoked so that resources can be
 * released (if necessary).
 *
 */
interface TopicManagementPolicy  extends Configurable, AutoCloseable {

    static interface AbstractRequestMetadata {

    topicState(String topicName);
 
    /**
     * Returns all the topics in the cluster, including internal topics if
     * {@code includeInternal} is true, and including those marked for deletion
     * if {@code includeMarkedForDeletion} is true.
     */
    Set<String> topics(boolean includeInternal, boolean includeMarkedForDeletion);
 
    /**
     * The number of brokers in the cluster.
     */
    int clusterSize();

    /**
     *  Returns the *current Thestate topicof the broker in actionwhich isthe beingmethod performedis uponcalled.
         */
        public String topicBrokerState brokerState();

}
        
/**
 * A policy that is enforced on topic *creation, Thealteration authenticatedand principaldeletion,
 making* theand request,for orthe nulldeletion ifof themessages sessionfrom isa not authenticated.topic.
 *
 * An implementation of this policy can be */
configured on a broker via the
 *  public KafkaPrincipal principal();
    }


    static interface CreateTopicRequest extends AbstractRequestMetadata {
        /**
         * The requested state of the topic to be created.
         */
        public RequestedTopicState requestedState();
    }

    /**
     * Validate the given request to create a topic{@code topic.management.policy.class.name} broker config.
 * When this is configured the named class will be instantiated reflectively
 * using its nullary constructor and will then pass the broker configs to
 * its <code>configure()</code> method. During broker shutdown, the
 * <code>close()</code> method will be invoked so that resources can be
 * released (if necessary).
 *
 */
interface TopicManagementPolicy  extends Configurable, AutoCloseable {
 
    static interface AbstractRequestMetadata {
 
        /**
         * The topic the action is being performed upon.
     * and throw a <code>PolicyViolationException<*/code>
 with a suitable error
     * message if the request does not satisfy this policy.public String topic();
 
        /**
     *
    * *The Clientsauthenticated willprincipal receivemaking the POLICY_VIOLATION error code along with the exception's message request, or null if the session is not authenticated.
     * Note that validation failure*/
 only affects the relevant topic,
   public KafkaPrincipal *principal();
 other topics in }
 the
 request
 will still be processed.
static interface CreateTopicRequest extends AbstractRequestMetadata *{
     * @param requestMetadata the request parameters for the provided topic.
 /**
         * @param clusterState the currentThe requested state of the cluster
topic to    * @throws PolicyViolationException if the request parameters do not satisfy this policy.be created.
         */
     */
   public voidRequestedTopicState validateCreateTopicrequestedState(CreateTopicRequest);
 requestMetadata, ClusterState clusterState) throws}
 PolicyViolationException;


    /**
     * Validate the staticgiven interfacerequest AlterTopicRequestto extendscreate AbstractRequestMetadataa {topic
     * and throw a <code>PolicyViolationException</**
code> with a suitable error
     * Themessage stateif the topicrequest willdoes havenot aftersatisfy thethis alterationpolicy.
         */
     * Clients will publicreceive RequestedTopicState requestedState();
    }

    /**the POLICY_VIOLATION error code along with the exception's message.
     * ValidateNote thethat givenvalidation requestfailure toonly alteraffects anthe existingrelevant topic,
     * andother topics throwin athe <code>PolicyViolationException</code>request withwill astill suitablebe errorprocessed.
     *
 message if the request does* not@param satisfyrequestMetadata thisthe policy.
request parameters for the provided *topic.
     * The given {@code clusterState} can be used to discover@param clusterState the current state of the topic to be modified.
     *cluster
     * Clients@throws willPolicyViolationException receiveif the POLICY_VIOLATIONrequest errorparameters codedo alongnot withsatisfy the exception's messagethis policy.
     */
 Note that validation failure only affects the relevant topic,
 void validateCreateTopic(CreateTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;
 
    *static otherinterface topicsAlterTopicRequest inextends theAbstractRequestMetadata request{
 will still be processed.
     /**
     * @param requestMetadata the request* parametersThe forstate the providedtopic topic.
will have after the alteration.
 * @param clusterState the current state of the cluster*/
     * @throws PolicyViolationException ifpublic the request parameters do not satisfy this policy.RequestedTopicState requestedState();
    }
 
     /**/
    void validateAlterTopic(AlterTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;

    /*** Validate the given request to alter an existing topic
     * Parametersand forthrow a request<code>PolicyViolationException</code> towith deletea the given topic.suitable error
     */
 message if the staticrequest interfacedoes DeleteTopicRequestnot extendssatisfy AbstractRequestMetadatathis {policy.
    }

    /**
     * Validate theThe given request{@code toclusterState} deletecan abe topic
used to discover the current *state andof throwthe atopic <code>PolicyViolationException</code>to with a suitable errorbe modified.
     *
 message if the request does* notClients satisfywill thisreceive policy.
     *
 the POLICY_VIOLATION error code along with the exception's message.
     * Note that validation failure only affects the relevant topic,
     * other topics Thein giventhe {@coderequest clusterState}will canstill be used to discover the current state of the topic to be deleted.
     *
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message.
     * Note that validation failure only affects the relevant topic,
     * other topics in the request will still be processed.
     *
     * @param requestMetadata the request parameters for the provided topic.
     * @param clusterState the current state of the cluster
     * @throws PolicyViolationException if the request parameters do not satisfy this policy.
     */
    void validateDeleteTopic(DeleteTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;

    /**
     * Parameters for a request to delete records from the topic.
     */
    static interface DeleteRecordsRequest extends AbstractRequestMetadata {

        /** processed.
     *
     * @param requestMetadata the request parameters for the provided topic.
     * @param clusterState the current state of the cluster
     * @throws PolicyViolationException if the request parameters do not satisfy this policy.
     */
    void validateAlterTopic(AlterTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;
 
    /**
     * Parameters for a request to delete the given topic.
     */
    static interface DeleteTopicRequest extends AbstractRequestMetadata {
    }
 
    /**
     * Validate the given request to delete a topic
     * and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the request does not satisfy this policy.
     *
     * The given {@code clusterState} can be used to discover the current state of the topic to be deleted.
     *
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message.
     * Note that validation failure only affects the relevant topic,
     * other topics in the request will still be processed.
     *
     * @param requestMetadata the request parameters for the provided topic.
     * @param clusterState the current state of the cluster
     * @throws PolicyViolationException if the request parameters do not satisfy this policy.
     */
    void validateDeleteTopic(DeleteTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;
 
    /**
     * Parameters for a request to delete records from the topic.
     */
    static interface DeleteRecordsRequest extends AbstractRequestMetadata {
 
        /**
         * Returns a map of topic partitions and the corresponding offset of the last message
         * to be retained. Messages before this offset will be deleted.
         * Partitions which won't have messages deleted won't be present in the map.
         */
        Map<Integer, Long> deletedMessageOffsets();
    }
 
    /**
     * Validate the given request to delete records from a topic
     * and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the request does not satisfy this policy.
     *
     * The given {@code clusterState} can be used to discover the current state of the topic to have records deleted.
     *
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message.
     * Note that validation failure only affects the relevant topic,
     * other topics in the request will still be processed.
     *
     * @param requestMetadata the request parameters for the provided topic.
     * @param clusterState the current state of the cluster
     * @throws PolicyViolationException if the request parameters do not satisfy this policy.
     */
    void validateDeleteRecords(DeleteRecordsRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;
}


/**
 * Represents the state of a broker
 */ 
interface BrokerState {

    /**
     * The broker config.
     */
    Map<String,String> configs();
}

interface RequestedBrokerState extends BrokerState {

    /**
     * The topic config as it will be if the request is successful.
     * This is effectively the same as the value of {@code configs}
     * after the following computation:
     * <pre><code>
     *   Map<String, String> configs = currentState.configs();
     *   configs.putAll(requestedState.requestedConfigs();
     * </code></pre>
     */    
    @Override
    Map<String,String> configs();


    /**
     * The broker configs present in the request.
     */
    Map<String,String> requestedConfigs();
}


/**
 * A policy that is enforced on broker alteration.
 *
 * An implementation of this policy can be configured on a broker via the
 * {@code broker.management.policy.class.name} broker config.
 * When this is configured the named class will be instantiated reflectively
 * using its nullary constructor and will then pass the broker configs to
 * its <code>configure()</code> method. During broker shutdown, the
 * <code>close()</code> method will be invoked so that resources can be
 * released (if necessary).
 *
 * TODO: Fully define the lifecycle since the policy is configured by broker config which changes, so a means of reconfiguration is required.
 */
interface BrokerManagementPolicy extends Configurable, AutoCloseable {
    static interface AbstractRequestMetadata {
 
        /**
         * The id of the broker the action is being performed upon.
         * This is always the same as the id of the broker in which the 
         * broker management policy is executing.
         */
        public int brokerId();
 
        /**
         * The authenticated principal making the request, or null if the session is not authenticated.
         */
 Returns a map of topic partitions and thepublic corresponding offset of the last message
  KafkaPrincipal principal();
    }
 
 
    static interface AlterBrokerRequest *extends toAbstractRequestMetadata be{
 retained. Messages before this offset will be deleted./**
         * PartitionsThe which won't have messages deleted won't be present in the maprequested state of the broker to be altered.
         */
        Map<Integer,public Long>RequestedBrokerState deletedMessageOffsetsrequestedState();
    }

    /**
     * Validate the given request to delete records fromalter a topicbroker
     * and throw a <code>PolicyViolationException</code> with a suitable error
     * message if the request does not satisfy this policy.
     *
     * The given {@code clusterState} can be used to discover the current* statemessage ofif the topicrequest does tonot havesatisfy recordsthis deletedpolicy.
     *
     * Clients will receive the POLICY_VIOLATION error code along with the exception's message.
     * Note that validation failure only affects the relevant topicbroker,
     * other topics in the request will still be processed.
     *
     * @param requestMetadata the request parameters for the provided topicbroker.
     * @param clusterState the current state of the cluster
     * @throws PolicyViolationException if the request parameters do not satisfy this policy.
     */

    void validateAlterBroker(AlterBrokerRequest requestMetadata, 
                             ClusterState clusterState) 
       void validateDeleteRecords(DeleteRecordsRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;
}

...

  • On topic deletion, i.e. when processing a DeleteTopicsRequest (this change done as part of this KIP).
  • On message deletion, i.e. when processing a DeleteRecordsRequest (this change done as part of this KIP).

The BrokerManagementPolicy will be applied:

  • On broker startup
    • This is to ensure that brokers start in a valid state; without this it would be possible for a later alter broker request to be denied even though the request itself was not the cause of the policy violation.
  • On broker modification, ie. when processing a AlterConfigsRequest for broker configs.

Deprecate existing policies

...

Having separate policy interfaces for creation/modification and deletion and retaining the existing single-method-per-policy-interface design was considered, but rejected because it was a half way house between having multiple policies and having a single policy.