...
Current state: Under Discussion
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: https://issues.apache.org/jira/browse/KAFKA-5693
...
New versions of existing network protocol protocol DeleteTopicsRequest
and and DeleteRecordsRequest
will be added, to add a validate_only flag.
New versions of existing network protocol protocol DeleteTopicsResponse
and and DeleteRecordsResponse
will be added to include an error message.
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Represents the state of a topic. */ interface TopicState { /** * The number of partitions of the topic. */ int numPartitions(); /** * The replication factor of the topic. More precisely, the number of assigned replicas for partition 0. * // TODO what about during reassignment */ short replicationFactor(); /** * A map of the replica assignments of the topic, with partition ids as keys and * the assigned brokers as the corresponding values. * // TODO what about during reassignment */ Map<Integer, List<Integer>> replicasAssignments(); /** * The topic config. */ Map<String,String> configs(); /** * Returns whether the topic is marked for deletion. */ boolean markedForDeletion(); /** * Returns whether the topic is an internal topic. */ boolean internal(); } /** * Represents the requested state of a topic. */ interface RequestedTopicState extends TopicState { /** * True if the {@link TopicState#replicasAssignments()} * in this request we generated by the broker, false if * they were explicitly requested by the client. */ boolean generatedReplicaAssignments(); } /** The current* stateThe oftopic theconfig topicsas init thewill cluster,be beforeif the request takesis effectsuccessful. */ interface ClusterState { /** This is effectively the *same Returnsas the current statevalue of the given topic, or null if the topic does not exist.{@code configs} * after the following computation: */ <pre><code> * TopicState topicState(String topicName); /** Map<String, String> configs = currentState.configs(); * Returns all the topics in the cluster, including internal topics if configs.putAll(requestedState.requestedConfigs(); * </code></pre> */ @Override * {@code includeInternal} is trueMap<String, and including those marked for deletionString> configs(); /** * The topic ifconfigs {@codepresent includeMarkedForDeletion}in isthe truerequest. */ Set<String>Map<String,String> topics(boolean includeInternal, boolean includeMarkedForDeletionrequestedConfigs(); } / /** The current state of * The number of brokers the topics in the cluster. , before the request takes effect. */ interface ClusterState { int clusterSize(); } /** * A policy that* isReturns enforcedthe oncurrent topicstate creation,of alterationthe andgiven deletiontopic, *or andnull forif the deletiontopic ofdoes messages from a topicnot exist. * * An implementation of*/ this policy can beTopicState configured on a broker via the * {@code topic.management.policy.class.name} broker config. * When this is configured the named class will be instantiated reflectively * using its nullary constructor and will then pass the broker configs to * its <code>configure()</code> method. During broker shutdown, the * <code>close()</code> method will be invoked so that resources can be * released (if necessary). * */ interface TopicManagementPolicy extends Configurable, AutoCloseable { static interface AbstractRequestMetadata { topicState(String topicName); /** * Returns all the topics in the cluster, including internal topics if * {@code includeInternal} is true, and including those marked for deletion * if {@code includeMarkedForDeletion} is true. */ Set<String> topics(boolean includeInternal, boolean includeMarkedForDeletion); /** * The number of brokers in the cluster. */ int clusterSize(); /** * Returns the *current Thestate topicof the broker actionin which isthe beingmethod performedis uponcalled. */ public String topicBrokerState brokerState(); } /** * A policy that is enforced on topic *creation, Thealteration authenticatedand principaldeletion, making* theand request,for orthe nulldeletion ifof themessages sessionfrom isa not authenticatedtopic. * * An implementation of this policy can */ be configured on a broker via the public KafkaPrincipal principal(); } static interface CreateTopicRequest extends AbstractRequestMetadata { /** * The requested state of the topic to be created. */ public RequestedTopicState requestedState(); } * {@code topic.management.policy.class.name} broker config. * When this is configured the named class will be instantiated reflectively * using its nullary constructor and will then pass the broker configs to * its <code>configure()</code> method. During broker shutdown, the * <code>close()</code> method will be invoked so that resources can be * released (if necessary). * */ interface TopicManagementPolicy extends Configurable, AutoCloseable { static interface AbstractRequestMetadata { /** * Validate the given request to create* aThe topic the action is being performed upon. * and throw a <code>PolicyViolationException</code> with a suitable error*/ * message if thepublic request does not satisfy this policy. String topic(); /** * Clients will receive the* POLICY_VIOLATIONThe errorauthenticated codeprincipal along withmaking the exception's messagerequest. * Note that validation failure*/ only affects the relevant topic, public KafkaPrincipal principal(); * other topics in} the request will still be processed. static interface CreateTopicRequest extends AbstractRequestMetadata *{ * @param requestMetadata the/** request parameters for the provided topic. * The *requested @paramstate clusterStateof the currenttopic stateto of the clusterbe created. * @throws PolicyViolationException if the*/ request parameters do not satisfypublic this policy.RequestedTopicState requestedState(); } */ void validateCreateTopic(CreateTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException; static interface AlterTopicRequest extends AbstractRequestMetadata { /** /** * Validate the given request to create a topic * and throw a <code>PolicyViolationException</code> with a suitable error * Themessage stateif the topicrequest willdoes havenot aftersatisfy thethis alterationpolicy. */ * Clients will publicreceive RequestedTopicState requestedState(); } /**the POLICY_VIOLATION error code along with the exception's message. * ValidateNote thethat givenvalidation requestfailure toonly alteraffects anthe existingrelevant topic, * other andtopics throwin athe <code>PolicyViolationException</code>request withwill astill suitablebe errorprocessed. * message if * @param requestMetadata the request doesparameters notfor satisfythe thisprovided policytopic. * @param clusterState the current state of *the Thecluster given {@code clusterState} can be* used@throws toPolicyViolationException discoverif the currentrequest stateparameters ofdo thenot topicsatisfy tothis be modifiedpolicy. */ void * Clients will receive the POLICY_VIOLATION error code along with the exception's message. * Note that validation failure only affects the relevant topic, validateCreateTopic(CreateTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException; static interface AlterTopicRequest extends AbstractRequestMetadata { /** * otherThe topicsstate in the requesttopic will have stillafter bethe processedalteration. * */ @param requestMetadata the request parameters for thepublic provided topic.RequestedTopicState requestedState(); } /* @param clusterState the current state of the cluster* * Validate the given request to alter an existing topic * @throws PolicyViolationException and throw a <code>PolicyViolationException</code> with a suitable error * message if the request parametersdoes do not satisfy this policy. */ void* validateAlterTopic(AlterTopicRequest requestMetadata, ClusterStateThe given {@code clusterState)} throws PolicyViolationException; /** * Parameters for a request to delete the given topiccan be used to discover the current state of the topic to be modified. */ static interface* DeleteTopicRequestClients extendswill AbstractRequestMetadatareceive { the POLICY_VIOLATION error code } along with the /**exception's message. * ValidateNote thethat givenvalidation requestfailure toonly deleteaffects athe relevant topic, * other andtopics throwin athe <code>PolicyViolationException</code>request withwill astill suitablebe errorprocessed. * message if the request does* not@param satisfyrequestMetadata thisthe policy. request parameters for the provided *topic. * The given {@code@param clusterState} can be used to discover the current state of the topic to be deleted.cluster * @throws PolicyViolationException if the *request Clientsparameters willdo receivenot thesatisfy POLICY_VIOLATION error code along with the exception's message. this policy. */ void validateAlterTopic(AlterTopicRequest *requestMetadata, NoteClusterState thatclusterState) validationthrows failurePolicyViolationException; only affects the relevant topic,/** * otherParameters topicsfor ina therequest requestto willdelete stillthe begiven processedtopic. */ static interface DeleteTopicRequest extends AbstractRequestMetadata { } /** @param* requestMetadataValidate the given request parametersto fordelete thea provided topic. * @paramand clusterStatethrow thea current<code>PolicyViolationException</code> statewith ofa thesuitable clustererror * @throws PolicyViolationExceptionmessage if the request parameters dodoes not satisfy this policy. */ void validateDeleteTopic(DeleteTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException; /** * Parameters for a request to delete records from the topic* The given {@code clusterState} can be used to discover the current state of the topic to be deleted. */ static interface* DeleteRecordsRequestClients extendswill AbstractRequestMetadatareceive { /**the POLICY_VIOLATION error code along with the exception's message. * Note that validation *failure Returnsonly aaffects mapthe ofrelevant topic, partitions and the* correspondingother offsettopics ofin the last message request will still be processed. * * to@param berequestMetadata retained.the Messagesrequest beforeparameters thisfor offsetthe willprovided be deletedtopic. * @param clusterState the current *state Partitionsof which won't have messages deleted won't be present in the map. the cluster * @throws PolicyViolationException if the request parameters do not satisfy this policy. */ void validateDeleteTopic(DeleteTopicRequest requestMetadata, ClusterState Map<Integer,clusterState) Long>throws deletedMessageOffsets()PolicyViolationException; } /** * ValidateParameters thefor givena request to delete records from athe topic. */ and throw astatic <code>PolicyViolationException</code>interface withDeleteRecordsRequest aextends suitableAbstractRequestMetadata error{ * message if the request does not satisfy this policy. /** * * TheReturns givena {@codemap clusterState}of cantopic be used to discoverpartitions and the currentcorresponding stateoffset of the topiclast message to have records deleted. * to be retained. Messages before *this Clientsoffset will be receivedeleted. the POLICY_VIOLATION error code along with the exception's message. * Note that validation failure only affects the relevant topic, * Partitions which won't have messages deleted won't be present in the map. */ * other topics inMap<Integer, the request will still be processed. Long> deletedMessageOffsets(); } /** * @paramValidate requestMetadatathe thegiven request parametersto fordelete therecords providedfrom a topic. * @paramand clusterStatethrow thea current<code>PolicyViolationException</code> statewith ofa thesuitable clustererror * @throwsmessage PolicyViolationException if the request parametersdoes do not satisfy this policy. */ void* validateDeleteRecords(DeleteRecordsRequest requestMetadata, ClusterStateThe given {@code clusterState)} throws PolicyViolationException; } |
The TopicManagementPolicy
will be applied:
can be used to discover the current state of the topic to have records deleted.
*
* Clients will receive the POLICY_VIOLATION error code along with the exception's message.
* Note that validation failure only affects the relevant topic,
* other topics in the request will still be processed.
*
* @param requestMetadata the request parameters for the provided topic.
* @param clusterState the current state of the cluster
* @throws PolicyViolationException if the request parameters do not satisfy this policy.
*/
void validateDeleteRecords(DeleteRecordsRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;
}
/**
* Represents the state of a broker
*/
interface BrokerState {
/**
* The broker config.
*/
Map<String,String> configs();
}
interface RequestedBrokerState extends BrokerState {
/**
* The topic config as it will be if the request is successful.
* This is effectively the same as the value of {@code configs}
* after the following computation:
* <pre><code>
* Map<String, String> configs = currentState.configs();
* configs.putAll(requestedState.requestedConfigs();
* </code></pre>
*/
@Override
Map<String,String> configs();
/**
* The broker configs present in the request.
*/
Map<String,String> requestedConfigs();
}
/**
* A policy that is enforced on broker alteration.
*
* An implementation of this policy can be configured on a broker via the
* {@code broker.management.policy.class.name} broker config.
* When this is configured the named class will be instantiated reflectively
* using its nullary constructor and will then pass the broker configs to
* its <code>configure()</code> method. During broker shutdown, the
* <code>close()</code> method will be invoked so that resources can be
* released (if necessary).
*
* TODO: Fully define the lifecycle since the policy is configured by broker config which changes, so a means of reconfiguration is required.
*/
interface BrokerManagementPolicy extends Configurable, AutoCloseable {
static interface AbstractRequestMetadata {
/**
* The id of the broker the action is being performed upon.
* This is always the same as the id of the broker in which the
* broker management policy is executing.
*/
public int brokerId();
/**
* The principal making the request.
*/
public KafkaPrincipal principal();
}
static interface AlterBrokerRequest extends AbstractRequestMetadata {
/**
* The requested state of the broker to be altered.
*/
public RequestedBrokerState requestedState();
}
/**
* Validate the given request to alter a broker
* and throw a <code>PolicyViolationException</code> with a suitable error
* message if the request does not satisfy this policy.
*
* Clients will receive the POLICY_VIOLATION error code along with the exception's message.
* Note that validation failure only affects the relevant broker,
* other topics in the request will still be processed.
*
* @param requestMetadata the request parameters for the provided broker.
* @param clusterState the current state of the cluster
* @throws PolicyViolationException if the request parameters do not satisfy this policy.
*/
void validateAlterBroker(AlterBrokerRequest requestMetadata,
ClusterState clusterState)
throws PolicyViolationException;
} |
The TopicManagementPolicy
will be applied:
- On topic creation, On topic creation, i.e. when processing a
CreateTopicsRequest
- On topic modification
- Change in topic config, ie. when processing
AlterConfigsRequest
, for topic configs (this change done as part of this KIP). - Adding partitions to topics, i.e. when processing a
CreatePartitionsRequest
(see KIP-195, but (see KIP-195, but this change done as part of this KIP) - Reassigning partitions to brokers, and/or changing the replication factor when processing
ReassignPartitionsRequest
(as part of KIP-179)
- Change in topic config, ie. when processing
- On topic deletion, i.e. when processing a
DeleteTopicsRequest
(this change done as part of this KIP). - On message deletion, i.e. when processing a
DeleteRecordsRequest
(this change done as part of this KIP) - Reassigning partitions to brokers, and/or changing the replication factor when processing
ReassignPartitionsRequest
(see KIP-179) - On topic deletion, i.e. when processing a
DeleteTopicsRequest
. - .
The BrokerManagementPolicy
will be applied:
- On broker startup
- This is to ensure that brokers start in a valid state; without this it would be possible for a later alter broker request to be denied even though the request itself was not the cause of the policy violation.
- On broker modification, ie. when processing a
AlterConfigsRequest
for broker configsOn message deletion, i.e. when processing aDeleteRecordsRequest
.
Deprecate existing policies
...
It will be a configuration-time error if both create.topic.policy.class.name
and topic.management.policy.class.name
are used at the same time, or both alter.config.policy.class.name
and topic.
are used at the same time.management
.policy.class.name
...
No Format |
---|
DeleteTopics Response (Version: 2) => throttle_time_ms [topic_error_codes] throttle_time_ms => INT32 topic_error_codes => topic error_code error_message topic => STRING error_code => INT16 error_message => NULLABLE_STRING |
Existing Old versions of the DeleteTopics resonse will be able to have the DeleteTopicsResponse
will use a UNEXPECTED_SERVER_ERROR error_code
s with value instead of POLICY_VIOLATION so as to not break clients.
The documentation for AdminClient.deleteTopics()
will be updated mention the possibility of PolicyViolationException
from the DeleteTopicsResult
methods.
...
No Format |
---|
DeleteRecords Response (Version: 0) => throttle_time_ms [topics] throttle_time_ms => INT32 topics => topic [partitions] topic => STRING partitions => partition low_watermark error_code error_message partition => INT32 low_watermark => INT64 error_code => INT16 error_message => NULLABLE_STRING |
Existing versions of the DeleteRecords resonse will be able to have the DeleteRecordsResponse
will use a UNEXPECTED_SERVER_ERROR error_code
s with value instead of POLICY_VIOLATION so as to not break clients.
The documentation for AdminClient.deleteRecords()
(being added by KIP-204) will be updated mention the possibility of PolicyViolationException
from the DeleteRecordsResult
methods.
...
Having separate policy interfaces for creation/modification and deletion and retaining the existing single-method-per-policy-interface design was considered, but rejected because it was a half way house between having multiple policies and having a single policy.