THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Represents the state of a topic. */ interface TopicState { /** * The number of partitions of the topic. */ int numPartitions(); /** * The replication factor of the topic. More precisely, the number of assigned replicas for partition 0. * // TODO what about during reassignment */ short replicationFactor(); /** * A map of the replica assignments of the topic, with partition ids as keys and * the assigned brokers as the corresponding values. * // TODO what about during reassignment */ Map<Integer, List<Integer>> replicasAssignments(); /** * The topic config. */ Map<String,String> configs(); /** * Returns whether the topic is marked for deletion. */ boolean markedForDeletion(); /** * Returns whether the topic is an internal topic. */ boolean internal(); } /** * Represents the requested state of a topic. */ interface RequestedTopicState extends TopicState { /** * True if the {@link TopicState#replicasAssignments()} * in this request we generated by the broker, false if * they were explicitly requested by the client. */ boolean generatedReplicaAssignments(); } /** The current* stateThe oftopic theconfig topicsas init thewill cluster,be beforeif the request takesis effectsuccessful. */ interface ClusterState { /** This is effectively the *same Returnsas the current statevalue of the given topic, or null if the topic does not exist.{@code configs} * after the following computation: */ <pre><code> * Map<String, String> configs TopicState= topicState(String topicNamecurrentState.configs(); /*** configs.putAll(requestedState.requestedConfigs(); * Returns all the topics in the cluster, including internal topics if</code></pre> */ @Override * {@code includeInternal} is true, and including those marked for deletion * if {@code includeMarkedForDeletion} is trueMap<String,String> configs(); /** * The topic configs present in the request. */ Set<String>Map<String,String> topics(boolean includeInternal, boolean includeMarkedForDeletionrequestedConfigs(); } /** The current state of * The number of brokersthe topics in the cluster. , before the request takes effect. */ interface ClusterState { int clusterSize(); } /** * A policy that* isReturns enforcedthe oncurrent topicstate creation,of alterationthe andgiven deletiontopic, *or andnull forif the deletiontopic ofdoes messages from a topicnot exist. * * An implementation of*/ this policy can beTopicState configured on a broker via the * {@code topic.management.policy.class.name} broker config. * When this is configured the named class will be instantiated reflectively * using its nullary constructor and will then pass the broker configs to * its <code>configure()</code> method. During broker shutdown, the * <code>close()</code> method will be invoked so that resources can be * released (if necessary). * */ interface TopicManagementPolicy extends Configurable, AutoCloseable { static interface AbstractRequestMetadata { topicState(String topicName); /** * Returns all the topics in the cluster, including internal topics if * {@code includeInternal} is true, and including those marked for deletion * if {@code includeMarkedForDeletion} is true. */ Set<String> topics(boolean includeInternal, boolean includeMarkedForDeletion); /** * The number of brokers in the cluster. */ int clusterSize(); /** * Returns the *current Thestate topicof the broker in actionwhich isthe beingmethod performedis uponcalled. */ public String topicBrokerState brokerState(); } /** * A policy that is enforced on topic *creation, Thealteration authenticatedand principaldeletion, making* theand request,for orthe nulldeletion ifof themessages sessionfrom isa not authenticated.topic. * * An implementation of this policy can be */ configured on a broker via the * public KafkaPrincipal principal(); } static interface CreateTopicRequest extends AbstractRequestMetadata { /** * The requested state of the topic to be created. */ public RequestedTopicState requestedState(); } /** * Validate the given request to create a topic{@code topic.management.policy.class.name} broker config. * When this is configured the named class will be instantiated reflectively * using its nullary constructor and will then pass the broker configs to * its <code>configure()</code> method. During broker shutdown, the * <code>close()</code> method will be invoked so that resources can be * released (if necessary). * */ interface TopicManagementPolicy extends Configurable, AutoCloseable { static interface AbstractRequestMetadata { /** * The topic the action is being performed upon. * and throw a <code>PolicyViolationException<*/code> with a suitable error * message if the request does not satisfy this policy.public String topic(); /** * * *The Clientsauthenticated willprincipal receivemaking the POLICY_VIOLATION error code along with the exception's message request, or null if the session is not authenticated. * Note that validation failure*/ only affects the relevant topic, public KafkaPrincipal *principal(); other topics in } the request will still be processed. static interface CreateTopicRequest extends AbstractRequestMetadata *{ * @param requestMetadata the request parameters for the provided topic. /** * @param clusterState the currentThe requested state of the cluster topic to * @throws PolicyViolationException if the request parameters do not satisfy this policy.be created. */ */ public voidRequestedTopicState validateCreateTopicrequestedState(CreateTopicRequest); requestMetadata, ClusterState clusterState) throws} PolicyViolationException; /** * Validate the staticgiven interfacerequest AlterTopicRequestto extendscreate AbstractRequestMetadataa {topic * and throw a <code>PolicyViolationException</** code> with a suitable error * Themessage stateif the topicrequest willdoes havenot aftersatisfy thethis alterationpolicy. */ * Clients will publicreceive RequestedTopicState requestedState(); } /**the POLICY_VIOLATION error code along with the exception's message. * ValidateNote thethat givenvalidation requestfailure toonly alteraffects anthe existingrelevant topic, * andother topics throwin athe <code>PolicyViolationException</code>request withwill astill suitablebe errorprocessed. * message if the request does* not@param satisfyrequestMetadata thisthe policy. request parameters for the provided *topic. * The given {@code clusterState} can be used to discover@param clusterState the current state of the topic to be modified. *cluster * Clients@throws willPolicyViolationException receiveif the POLICY_VIOLATIONrequest errorparameters codedo alongnot withsatisfy the exception's messagethis policy. */ Note that validation failure only affects the relevant topic, void validateCreateTopic(CreateTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException; *static otherinterface topicsAlterTopicRequest inextends theAbstractRequestMetadata request{ will still be processed. /** * @param requestMetadata the request* parametersThe forstate the providedtopic topic. will have after the alteration. * @param clusterState the current state of the cluster*/ * @throws PolicyViolationException ifpublic the request parameters do not satisfy this policy.RequestedTopicState requestedState(); } /**/ void validateAlterTopic(AlterTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException; /*** Validate the given request to alter an existing topic * Parametersand forthrow a request<code>PolicyViolationException</code> towith deletea the given topic.suitable error */ message if the staticrequest interfacedoes DeleteTopicRequestnot extendssatisfy AbstractRequestMetadatathis {policy. } /** * Validate theThe given request{@code toclusterState} deletecan abe topic used to discover the current *state andof throwthe atopic <code>PolicyViolationException</code>to with a suitable errorbe modified. * message if the request does* notClients satisfywill thisreceive policy. * the POLICY_VIOLATION error code along with the exception's message. * Note that validation failure only affects the relevant topic, * other topics Thein giventhe {@coderequest clusterState}will canstill be used to discover the current state of the topic to be deleted. * * Clients will receive the POLICY_VIOLATION error code along with the exception's message. * Note that validation failure only affects the relevant topic, * other topics in the request will still be processed. * * @param requestMetadata the request parameters for the provided topic. * @param clusterState the current state of the cluster * @throws PolicyViolationException if the request parameters do not satisfy this policy. */ void validateDeleteTopic(DeleteTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException; /** * Parameters for a request to delete records from the topic. */ static interface DeleteRecordsRequest extends AbstractRequestMetadata { /** processed. * * @param requestMetadata the request parameters for the provided topic. * @param clusterState the current state of the cluster * @throws PolicyViolationException if the request parameters do not satisfy this policy. */ void validateAlterTopic(AlterTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException; /** * Parameters for a request to delete the given topic. */ static interface DeleteTopicRequest extends AbstractRequestMetadata { } /** * Validate the given request to delete a topic * and throw a <code>PolicyViolationException</code> with a suitable error * message if the request does not satisfy this policy. * * The given {@code clusterState} can be used to discover the current state of the topic to be deleted. * * Clients will receive the POLICY_VIOLATION error code along with the exception's message. * Note that validation failure only affects the relevant topic, * other topics in the request will still be processed. * * @param requestMetadata the request parameters for the provided topic. * @param clusterState the current state of the cluster * @throws PolicyViolationException if the request parameters do not satisfy this policy. */ void validateDeleteTopic(DeleteTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException; /** * Parameters for a request to delete records from the topic. */ static interface DeleteRecordsRequest extends AbstractRequestMetadata { /** * Returns a map of topic partitions and the corresponding offset of the last message * to be retained. Messages before this offset will be deleted. * Partitions which won't have messages deleted won't be present in the map. */ Map<Integer, Long> deletedMessageOffsets(); } /** * Validate the given request to delete records from a topic * and throw a <code>PolicyViolationException</code> with a suitable error * message if the request does not satisfy this policy. * * The given {@code clusterState} can be used to discover the current state of the topic to have records deleted. * * Clients will receive the POLICY_VIOLATION error code along with the exception's message. * Note that validation failure only affects the relevant topic, * other topics in the request will still be processed. * * @param requestMetadata the request parameters for the provided topic. * @param clusterState the current state of the cluster * @throws PolicyViolationException if the request parameters do not satisfy this policy. */ void validateDeleteRecords(DeleteRecordsRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException; } /** * Represents the state of a broker */ interface BrokerState { /** * The broker config. */ Map<String,String> configs(); } interface RequestedBrokerState extends BrokerState { /** * The topic config as it will be if the request is successful. * This is effectively the same as the value of {@code configs} * after the following computation: * <pre><code> * Map<String, String> configs = currentState.configs(); * configs.putAll(requestedState.requestedConfigs(); * </code></pre> */ @Override Map<String,String> configs(); /** * The broker configs present in the request. */ Map<String,String> requestedConfigs(); } /** * A policy that is enforced on broker alteration. * * An implementation of this policy can be configured on a broker via the * {@code broker.management.policy.class.name} broker config. * When this is configured the named class will be instantiated reflectively * using its nullary constructor and will then pass the broker configs to * its <code>configure()</code> method. During broker shutdown, the * <code>close()</code> method will be invoked so that resources can be * released (if necessary). * * TODO: Fully define the lifecycle since the policy is configured by broker config which changes, so a means of reconfiguration is required. */ interface BrokerManagementPolicy extends Configurable, AutoCloseable { static interface AbstractRequestMetadata { /** * The id of the broker the action is being performed upon. * This is always the same as the id of the broker in which the * broker management policy is executing. */ public int brokerId(); /** * The authenticated principal making the request, or null if the session is not authenticated. */ Returns a map of topic partitions and thepublic corresponding offset of the last message KafkaPrincipal principal(); } static interface AlterBrokerRequest *extends toAbstractRequestMetadata be{ retained. Messages before this offset will be deleted./** * PartitionsThe which won't have messages deleted won't be present in the maprequested state of the broker to be altered. */ Map<Integer,public Long>RequestedBrokerState deletedMessageOffsetsrequestedState(); } /** * Validate the given request to delete records fromalter a topicbroker * and throw a <code>PolicyViolationException</code> with a suitable error * message if the request does not satisfy this policy. * * The given {@code clusterState} can be used to discover the current* statemessage ofif the topicrequest does tonot havesatisfy recordsthis deletedpolicy. * * Clients will receive the POLICY_VIOLATION error code along with the exception's message. * Note that validation failure only affects the relevant topicbroker, * other topics in the request will still be processed. * * @param requestMetadata the request parameters for the provided topicbroker. * @param clusterState the current state of the cluster * @throws PolicyViolationException if the request parameters do not satisfy this policy. */ void validateAlterBroker(AlterBrokerRequest requestMetadata, ClusterState clusterState) void validateDeleteRecords(DeleteRecordsRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException; } |
...
- On topic deletion, i.e. when processing a
DeleteTopicsRequest
(this change done as part of this KIP). - On message deletion, i.e. when processing a
DeleteRecordsRequest
(this change done as part of this KIP).
The BrokerManagementPolicy
will be applied:
- On broker startup
- This is to ensure that brokers start in a valid state; without this it would be possible for a later alter broker request to be denied even though the request itself was not the cause of the policy violation.
- On broker modification, ie. when processing a
AlterConfigsRequest
for broker configs.
Deprecate existing policies
...
Having separate policy interfaces for creation/modification and deletion and retaining the existing single-method-per-policy-interface design was considered, but rejected because it was a half way house between having multiple policies and having a single policy.