...
Current state: Under Discussion
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: https://issues.apache.org/jira/browse/KAFKA-5693
...
New versions of existing network protocol protocol DeleteTopicsRequest
and and DeleteRecordsRequest
will be added, to add a validate_only flag.
New versions of existing network protocol protocol DeleteTopicsResponse
and and DeleteRecordsResponse
will be added to include an error message.
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Represents the state of a topic either before, or as a result of, * an administrative request affecting a topic. */ interface TopicState { /** * The number of partitions of the topic. */ int numPartitions(); /** * The replication factor of the topic. More precisely, the number of assigned replicas for partition 0. * // TODO what about during reassignment */ Shortshort replicationFactor(); /** * A map of the replica assignments of the topic, with partition ids as keys and * the assigned brokers as the corresponding values. * // TODO what about during reassignment */ Map<Integer, List<Integer>> replicasAssignments(); /** * The topic config. */ Map<String,String> configs(); /** * Returns whether the topic is marked for deletion. */ boolean markedForDeletion(); /** * Returns whether the topic is an internal topic. */ boolean internal(); } /** * Represents Thethe currentrequested state of thea topics in the cluster, before the request takes effect. topic. */ interface RequestedTopicState extends ClusterStateTopicState { /** * True Returnsif the current state of the given topic, or null if the topic does not exist. {@link TopicState#replicasAssignments()} * in this request we generated by the broker, false if */ they were explicitly requested TopicStateby topicState(String topicNamethe client. */ boolean generatedReplicaAssignments(); /** * The Returnstopic allconfig theas topicsit inwill thebe cluster,if includingthe internalrequest topicsis ifsuccessful. * This {@codeis includeInternal}effectively isthe true,same andas includingthe thosevalue markedof for{@code deletionconfigs} * ifafter {@codethe includeMarkedForDeletion} is true.following computation: */ <pre><code> * Set<String> topics(boolean includeInternalMap<String, String> configs boolean includeMarkedForDeletion= currentState.configs(); /*** configs.putAll(requestedState.requestedConfigs(); * The number of brokers in the cluster.</code></pre> */ */@Override intMap<String,String> clusterSizeconfigs(); } /** * A policy that is* enforcedThe ontopic topicconfigs creation,present alterationin andthe deletion,request. * and for the deletion*/ of messages from a topic.Map<String,String> requestedConfigs(); } /* * *The Ancurrent implementationstate of thisthe policytopics canin bethe configuredcluster, onbefore athe brokerrequest viatakes the effect. */ interface ClusterState {@code topic.management.policy.class.name} broker config. * When this is configured the named class will be instantiated reflectively * using its nullary constructor and will then pass the broker configs to * its <code>configure()</code> method. During broker shutdown, the * <code>close()</code> method will be invoked so that resources can be * released (if necessary). * */ interface TopicManagementPolicy extends Configurable, AutoCloseable { static interface AbstractRequestMetadata { /** * The topic the action is being performed upon. */ public String topic(); /** * Returns the current state of the given topic, or null if the topic does not exist. */ TopicState topicState(String topicName); /** * Returns all the topics in the cluster, including internal topics if * {@code includeInternal} is true, and including those marked for deletion * if {@code includeMarkedForDeletion} is true. */ Set<String> topics(boolean includeInternal, boolean includeMarkedForDeletion); /** * The number of brokers in the cluster. */ int clusterSize(); /** * Returns the *current Thestate authenticated principalof making the request,broker orin nullwhich if the sessionmethod is not authenticatedcalled. */ public KafkaPrincipal principalBrokerState brokerState(); } /** * A policy staticthat interfaceis CreateTopicRequestenforced extendson AbstractRequestMetadatatopic { creation, alteration and deletion, * and for the /** deletion of messages from a topic. * * TheAn requested stateimplementation of thethis topicpolicy tocan be created. */ public TopicState requestedState(); } /** * Validate the given request to create a topic * and throw a <code>PolicyViolationException</code> with a suitable error * message if the request does not satisfy this policy. * * Clients will receive the POLICY_VIOLATION error code along with the exception's message. configured on a broker via the * {@code topic.management.policy.class.name} broker config. * When this is configured the named class will be instantiated reflectively * using its nullary constructor and will then pass the broker configs to * its <code>configure()</code> method. During broker shutdown, the * <code>close()</code> method will be invoked so that resources can be * released (if necessary). * */ interface TopicManagementPolicy extends Configurable, AutoCloseable { static interface AbstractRequestMetadata { /** * NoteThe thattopic validationthe failureaction onlyis affectsbeing the relevant topic,performed upon. * other topics in the*/ request will still be processed. public String topic(); * * @param requestMetadata the/** request parameters for the provided topic. * The *authenticated @paramprincipal clusterStatemaking the currentrequest. state of the cluster */ @throws PolicyViolationException if the request parameters do notpublic satisfy this policy. KafkaPrincipal principal(); } */ voidstatic interface validateCreateTopic(CreateTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException; static interface AlterTopicRequest extends AbstractRequestMetadata { /** * The requested state of the topic willto have after the alterationbe created. */ public TopicStateRequestedTopicState requestedState(); } /** * Validate the given request to altercreate an existinga topic * and throw a <code>PolicyViolationException</code> with a suitable error * message if the request does not satisfy this policy. * * TheClients givenwill {@codereceive clusterState} can be used to discoverthe POLICY_VIOLATION error code along with the current state of the topic to be modified. * * Clients will receive the POLICY_VIOLATION error code along with the exception's message. * exception's message. * Note that validation failure only affects the relevant topic, * other topics in the request will still be processed. * * @param requestMetadata the request parameters for the provided topic. * @param clusterState the current state of the cluster * @throws PolicyViolationException if the request parameters do not satisfy this policy. */ void validateAlterTopicvalidateCreateTopic(AlterTopicRequestCreateTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException; /** static interface AlterTopicRequest extends AbstractRequestMetadata *{ Parameters for a request to delete the given topic. /** * The state the topic will have after the alteration. */ static interface DeleteTopicRequest extendspublic AbstractRequestMetadata {RequestedTopicState requestedState(); } /** * Validate the given request to deletealter an aexisting topic * and throw a <code>PolicyViolationException</code> with a suitable error * message if the request does not satisfy this policy. * * The given {@code clusterState} can be used to discover the current state of the topic to be deletedmodified. * * Clients will receive the POLICY_VIOLATION error code along with the exception's message. * Note that validation failure only affects the relevant topic, * other topics in the request will still be processed. * * @param requestMetadata the request parameters for the provided topic. * @param clusterState the current state of the cluster * @throws PolicyViolationException if the request parameters do not satisfy this policy. */ void validateDeleteTopicvalidateAlterTopic(DeleteTopicRequestAlterTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException; /** * Parameters for a request to delete recordsthe from thegiven topic. */ static interface DeleteRecordsRequestDeleteTopicRequest extends AbstractRequestMetadata { } /** * Validate the given *request Returnsto delete a map of topic partitions and the corresponding offset* ofand thethrow last message a <code>PolicyViolationException</code> with a suitable error * tomessage beif retained.the Messagesrequest beforedoes thisnot offsetsatisfy willthis be deletedpolicy. * * PartitionsThe which won't have messages deleted won't be present in the map. given {@code clusterState} can be used to discover the current state of the topic to be deleted. */ * Clients will Map<Integer,receive Long> deletedMessageOffsets(); } /**the POLICY_VIOLATION error code along with the exception's message. * ValidateNote thethat givenvalidation requestfailure toonly deleteaffects recordsthe fromrelevant a topic, * andother throwtopics ain <code>PolicyViolationException</code> with a suitable errorthe request will still be processed. * * message@param ifrequestMetadata the request doesparameters notfor satisfythe thisprovided policytopic. * @param clusterState the current *state Theof giventhe {@codecluster clusterState} can be used to* discover@throws thePolicyViolationException currentif statethe ofrequest theparameters topicdo tonot havesatisfy recordsthis deletedpolicy. */ void * Clients will receive the POLICY_VIOLATION error code along with the exception's message.validateDeleteTopic(DeleteTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException; /** * Parameters for Notea thatrequest validationto failuredelete onlyrecords affectsfrom the relevant topic,. */ other topics instatic theinterface requestDeleteRecordsRequest willextends stillAbstractRequestMetadata be{ processed. /** * @param requestMetadata the request* parametersReturns fora themap providedof topic. partitions and the corresponding *offset @paramof clusterStatethe thelast currentmessage state of the cluster * @throwsto PolicyViolationExceptionbe ifretained. theMessages requestbefore parametersthis dooffset notwill satisfybe this policydeleted. */ void* validateDeleteRecords(DeleteRecordsRequest requestMetadata, ClusterState clusterState) throws PolicyViolationExceptionPartitions which won't have messages deleted won't be present in the map. */ Map<Integer, Long> deletedMessageOffsets(); } /** * Validate the given request to delete records from a topic * and throw a <code>PolicyViolationException</code> with a suitable error * message if the request does not satisfy this policy. * * The given {@code clusterState} can be used to discover the current state of the topic to have records deleted. * * Clients will receive the POLICY_VIOLATION error code along with the exception's message. * Note that validation failure only affects the relevant topic, * other topics in the request will still be processed. * * @param requestMetadata the request parameters for the provided topic. * @param clusterState the current state of the cluster * @throws PolicyViolationException if the request parameters do not satisfy this policy. */ void validateDeleteRecords(DeleteRecordsRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException; } /** * Represents the state of a broker */ interface BrokerState { /** * The broker config. */ Map<String,String> configs(); } interface RequestedBrokerState extends BrokerState { /** * The topic config as it will be if the request is successful. * This is effectively the same as the value of {@code configs} * after the following computation: * <pre><code> * Map<String, String> configs = currentState.configs(); * configs.putAll(requestedState.requestedConfigs(); * </code></pre> */ @Override Map<String,String> configs(); /** * The broker configs present in the request. */ Map<String,String> requestedConfigs(); } /** * A policy that is enforced on broker alteration. * * An implementation of this policy can be configured on a broker via the * {@code broker.management.policy.class.name} broker config. * When this is configured the named class will be instantiated reflectively * using its nullary constructor and will then pass the broker configs to * its <code>configure()</code> method. During broker shutdown, the * <code>close()</code> method will be invoked so that resources can be * released (if necessary). * * TODO: Fully define the lifecycle since the policy is configured by broker config which changes, so a means of reconfiguration is required. */ interface BrokerManagementPolicy extends Configurable, AutoCloseable { static interface AbstractRequestMetadata { /** * The id of the broker the action is being performed upon. * This is always the same as the id of the broker in which the * broker management policy is executing. */ public int brokerId(); /** * The principal making the request. */ public KafkaPrincipal principal(); } static interface AlterBrokerRequest extends AbstractRequestMetadata { /** * The requested state of the broker to be altered. */ public RequestedBrokerState requestedState(); } /** * Validate the given request to alter a broker * and throw a <code>PolicyViolationException</code> with a suitable error * message if the request does not satisfy this policy. * * Clients will receive the POLICY_VIOLATION error code along with the exception's message. * Note that validation failure only affects the relevant broker, * other topics in the request will still be processed. * * @param requestMetadata the request parameters for the provided broker. * @param clusterState the current state of the cluster * @throws PolicyViolationException if the request parameters do not satisfy this policy. */ void validateAlterBroker(AlterBrokerRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException; } |
The TopicManagementPolicy
will be applied:
- On topic creation, i.e. when processing a
CreateTopicsRequest
- On topic modification
- Change in topic config, ie. when processing
AlterConfigsRequest
, for topic configs (this change done as part of this KIP). - Adding partitions to topics, i.e. when processing a
CreatePartitionsRequest
(see KIP-195, but (see KIP-195, but this change done as part of this KIP) - Reassigning partitions to brokers, and/or changing the replication factor when processing
ReassignPartitionsRequest
(as part of KIP-179)
- Change in topic config, ie. when processing
- On topic deletion, i.e. when processing a
DeleteTopicsRequest
(this change done as part of this KIP). - On message deletion, i.e. when processing a
DeleteRecordsRequest
(this change done as part of this KIP) - Reassigning partitions to brokers, and/or changing the replication factor when processing
ReassignPartitionsRequest
(see KIP-179) - On topic deletion, i.e. when processing a
DeleteTopicsRequest
. - .
The BrokerManagementPolicy
will be applied:
- On broker startup
- This is to ensure that brokers start in a valid state; without this it would be possible for a later alter broker request to be denied even though the request itself was not the cause of the policy violation.
- On broker modification, ie. when processing a
AlterConfigsRequest
for broker configsOn message deletion, i.e. when processing aDeleteRecordsRequest
.
Deprecate existing policies
...
It will be a configuration-time error if both create.topic.policy.class.name
and topic.management.policy.class.name
are used at the same time, or both alter.config.policy.class.name
and topic.
are used at the same time.management
.policy.class.name
...
No Format |
---|
DeleteTopics Response (Version: 2) => throttle_time_ms [topic_error_codes] throttle_time_ms => INT32 topic_error_codes => topic error_code error_message topic => STRING error_code => INT16 error_message => NULLABLE_STRING |
Existing Old versions of the DeleteTopics resonse will be able to have the DeleteTopicsResponse
will use a UNEXPECTED_SERVER_ERROR error_code
s with value instead of POLICY_VIOLATION so as to not break clients.
The documentation for AdminClient.deleteTopics()
will be updated mention the possibility of PolicyViolationException
from the DeleteTopicsResult
methods.
...
No Format |
---|
DeleteRecords Response (Version: 0) => throttle_time_ms [topics] throttle_time_ms => INT32 topics => topic [partitions] topic => STRING partitions => partition low_watermark error_code error_message partition => INT32 low_watermark => INT64 error_code => INT16 error_message => NULLABLE_STRING |
Existing versions of the DeleteRecords resonse will be able to have the DeleteRecordsResponse
will use a UNEXPECTED_SERVER_ERROR error_code
s with value instead of POLICY_VIOLATION so as to not break clients.
The documentation for AdminClient.deleteRecords()
(being added by KIP-204) will be updated mention the possibility of PolicyViolationException
from the DeleteRecordsResult
methods.
...
Having separate policy interfaces for creation/modification and deletion and retaining the existing single-method-per-policy-interface design was considered, but rejected because it was a half way house between having multiple policies and having a single policy.