...
Current state: Under Discussion
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: https://issues.apache.org/jira/browse/KAFKA-5693
...
The existing policy interfaces CreateTopicPolicy
and AlterConfigPolicy
will be deprecated, but will continue to be applied where they are currently applied until they are removed.
New versions of existing network protocol DeleteTopicsRequest
and DeleteRecordsRequest
will be added, to add a validate_only flag.
New versions of existing network protocol DeleteTopicsResponse
and DeleteRecordsResponse
will be added to include an error message.
Proposed Changes
Add TopicManagementPolicy and supporting interfaces
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Represents the state of a topic either before, or as a result of, * an administrative request affecting a topic. */ interface TopicState { /** * . */ interface TopicState { /** * The number of partitions of the topic. */ int numPartitions(); /** * The replication factor of the topic. More precisely, the number of assigned replicas for partition 0. * // TODO what about during reassignment */ Shortshort replicationFactor(); /** * A map of the replica assignments of the topic, with partition ids as keys and * the assigned brokers as the corresponding values. * // TODO what about during reassignment */ Map<Integer, List<Integer>> replicasAssignments(); /** * The topic config. */ Map<String,String> configs(); /** * Returns whether the topic is marked for deletion. */ boolean markedForDeletion(); /** * Returns whether the topic is an internal topic. */ boolean internal(); } /** The* currentRepresents state of the topicsrequested instate theof cluster, before the request takes effect.a topic. */ interface ClusterStateRequestedTopicState { extends TopicState { /** * True Returnsif the current state of the given topic, or null if the topic does not exist {@link TopicState#replicasAssignments()} * in this request we generated by the broker, false if * they were explicitly requested by the client. */ TopicStateboolean topicStategeneratedReplicaAssignments(String topicName); /** * ReturnsThe alltopic theconfig as topicsit inwill thebe cluster,if includingthe internalrequest topicsis ifsuccessful. * {@code includeInternal} is true, and including those marked for deletionThis is effectively the same as the value of {@code configs} * ifafter {@codethe includeMarkedForDeletion} is true.following computation: */ <pre><code> Set<String> topics(boolean includeInternal * Map<String, boolean includeMarkedForDeletionString> configs = currentState.configs(); /*** configs.putAll(requestedState.requestedConfigs(); * The number of brokers in the cluster.</code></pre> */ */@Override intMap<String,String> clusterSizeconfigs(); } /** * A policy that is* enforcedThe on topic creation,configs alterationpresent and deletion,in the request. * and for the deletion*/ of messages from a topic.Map<String,String> requestedConfigs(); } /* * *The Ancurrent implementationstate of thisthe topics policyin canthe becluster, configuredbefore onthe arequest brokertakes viaeffect. the*/ interface *ClusterState {@code topic.management.policy.class.name} broker config. * When this is configured the named class will be instantiated reflectively * using its nullary constructor and will then pass the broker configs to * its <code>configure()</code> method. During broker shutdown, the * <code>close()</code> method will be invoked so that resources can be * released (if necessary). * */ interface TopicManagementPolicy extends Configurable, AutoCloseable { static interface AbstractRequestMetadata { /** * The topic the action is being performed upon. */ public String topic /** * Returns the current state of the given topic, or null if the topic does not exist. */ TopicState topicState(String topicName); /** * Returns all the topics in the cluster, including internal topics if * {@code includeInternal} is true, and including those marked for deletion * if {@code includeMarkedForDeletion} is true. */ Set<String> topics(boolean includeInternal, boolean includeMarkedForDeletion); /** * The number of brokers in the cluster. */ int clusterSize(); /** * Returns the *current The authenticated principal makingstate of the request,broker orin nullwhich if the sessionmethod is not authenticatedcalled. */ public KafkaPrincipal principalBrokerState brokerState(); } /** * A policy staticthat interfaceis CreateTopicRequestenforced extendson AbstractRequestMetadatatopic { creation, alteration and deletion, * and for the /** deletion of messages from a topic. * * TheAn requestedimplementation state of thethis topicpolicy tocan be created. configured on a broker via the * {@code */topic.management.policy.class.name} broker config. * When this is configured the named publicclass TopicState requestedState(); } /** * Validate the given request to create a topic * and throw a <code>PolicyViolationException</code> with a suitable error * message if the request does not satisfy this policy. * * Clients will receive the POLICY_VIOLATION error code along with the exception's message.will be instantiated reflectively * using its nullary constructor and will then pass the broker configs to * its <code>configure()</code> method. During broker shutdown, the * <code>close()</code> method will be invoked so that resources can be * released (if necessary). * */ interface TopicManagementPolicy extends Configurable, AutoCloseable { static interface AbstractRequestMetadata { /** * Note that validation failure* onlyThe affectstopic the relevantaction topic, is being performed upon. * other topics in the request will still be processed. **/ * @param requestMetadata thepublic request parameters for the provided topic. String topic(); * @param clusterState the current/** state of the cluster * @throwsThe authenticated PolicyViolationExceptionprincipal ifmaking the request parameters. do not satisfy this policy. */ void validateCreateTopic(CreateTopicRequest requestMetadata, ClusterState clusterState)public throwsKafkaPrincipal PolicyViolationException; principal(); } static interface AlterTopicRequestCreateTopicRequest extends AbstractRequestMetadata { /** * The requested state of the topic willto have after the alterationbe created. */ public TopicStateRequestedTopicState requestedState(); } /** * Validate the given request to altercreate ana existing topic * and throw a <code>PolicyViolationException</code> with a suitable error * message if the request does not satisfy this policy. * * TheClients givenwill {@codereceive clusterState} can be used to discoverthe POLICY_VIOLATION error code along with the current state of the topic to be modified. * * Clients will receive the POLICY_VIOLATION error code along with the exception's message. * Note exception's message. * Note that validation failure only affects the relevant topic, * other topics in the request will still be processed. * * @param requestMetadata the request parameters for the provided topic. * @param clusterState the current state of the cluster * @throws PolicyViolationException if the request parameters do not satisfy this policy. */ void validateAlterTopicvalidateCreateTopic(AlterTopicRequestCreateTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException; /** static interface AlterTopicRequest extends AbstractRequestMetadata *{ Parameters for a request to delete the given topic /** * The state the topic will have after the alteration. */ static interface DeleteTopicRequest extendspublic AbstractRequestMetadata {RequestedTopicState requestedState(); } /** * Validate the given request to deletealter an aexisting topic * and throw a <code>PolicyViolationException</code> with a suitable error * message if the request does not satisfy this policy. * * The given {@code clusterState} can be used to discover the current state of the topic to be deletedmodified. * * Clients will receive the POLICY_VIOLATION error code along with the exception's message. * Note that validation failure only affects the relevant topic, * other topics in the request will still be processed. * * @param requestMetadata the request parameters for the provided topic. * @param clusterState the current state of the cluster * @throws PolicyViolationException if the request parameters do not satisfy this policy. */ void validateDeleteTopicvalidateAlterTopic(DeleteTopicRequestAlterTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException; /** * Parameters for a request to delete recordsthe fromgiven the topic. */ static interface DeleteRecordsRequestDeleteTopicRequest extends AbstractRequestMetadata { } /** * Validate the given request *to Returnsdelete a map of topic partitions and the corresponding offset* ofand thethrow last message a <code>PolicyViolationException</code> with a suitable error * tomessage beif retained.the Messagesrequest beforedoes thisnot offsetsatisfy willthis be deletedpolicy. * * PartitionsThe which won't have messages deleted won't be present in the map. given {@code clusterState} can be used to discover the current state of the topic to be deleted. */ * Clients will Map<Integer,receive Long> deletedMessageOffsets(); } /**the POLICY_VIOLATION error code along with the exception's message. * ValidateNote thethat givenvalidation requestfailure toonly deleteaffects recordsthe from arelevant topic, * andother topics throwin athe <code>PolicyViolationException</code>request withwill astill suitablebe errorprocessed. * message if the request does* not@param satisfyrequestMetadata thisthe policy. request parameters for the provided *topic. * The@param given {@code clusterState} can be used to discover the current state of the topiccluster to have records deleted. * @throws PolicyViolationException *if the request parameters do not satisfy this policy. * Clients will receive the POLICY_VIOLATION error code along with the exception's message./ void validateDeleteTopic(DeleteTopicRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException; /** * Parameters for Notea thatrequest validationto failuredelete onlyrecords affectsfrom the relevant topic,. */ other topics in thestatic requestinterface willDeleteRecordsRequest stillextends beAbstractRequestMetadata processed.{ * /** @param requestMetadata the request parameters for the provided topic. * Returns a map of *topic @parampartitions clusterStateand the currentcorresponding stateoffset of the clusterlast message * @throws PolicyViolationException if the* requestto parametersbe doretained. notMessages satisfybefore this policy. offset will be deleted. * Partitions which won't have messages deleted won't be present in the map. */ void validateDeleteRecords(DeleteRecordsRequest requestMetadataMap<Integer, ClusterStateLong> clusterStatedeletedMessageOffsets() throws PolicyViolationException; } |
The TopicManagementPolicy
will be applied:
- On topic creation, i.e. when processing a
CreateTopicsRequest
- On topic modification
- Change in topic config, ie. when processing
AlterConfigsRequest
, for topic configs (this change done as part of this KIP). - Adding partitions to topics, i.e. when processing a
CreatePartitionsRequest
(see KIP-195, but this change done as part of this KIP) - Reassigning partitions to brokers, and/or changing the replication factor when processing
ReassignPartitionsRequest
(see KIP-179)
- Change in topic config, ie. when processing
- On topic deletion, i.e. when processing a
DeleteTopicsRequest
. - On message deletion, i.e. when processing a
DeleteRecordsRequest
.
Deprecate existing policies
The existing CreateTopicPolicy
and AlterConfigPolicy
will be deprecated, but will continue to be applied when they are configured.
Using create.topic.policy.class.name
or alter.config.policy.class.name
will result in an deprecation warning in the broker logs.
It will be a configuration time error if both create.topic.policy.class.name
and topic.management.policy.class.name
are used at the same time, or both alter.config.policy.class.name
and topic.
are used at the same time.management
.policy.class.name
Internally, an adapter implementation of TopicManagementPolicy
will be used when CreateTopicPolicy
and AlterConfigPolicy
are configured, so policy use sites won't be unnecessarily complicated.
If, in the future, AdminClient.alterConfigs()
/AlterConfigsRequest
is changed to support changing broker configs a separate policy interface can be applied to such changes.
Compatibility, Deprecation, and Migration Plan
Existing users will have to reimplement their policies in terms of the new TopicManagementPolicy
interface, and reconfigure their brokers accordingly. Since the TopicManagementPolicy
contains a superset of the existing information used by the deprecated policies such reimplementation should be trivial.
;
}
/**
* Validate the given request to delete records from a topic
* and throw a <code>PolicyViolationException</code> with a suitable error
* message if the request does not satisfy this policy.
*
* The given {@code clusterState} can be used to discover the current state of the topic to have records deleted.
*
* Clients will receive the POLICY_VIOLATION error code along with the exception's message.
* Note that validation failure only affects the relevant topic,
* other topics in the request will still be processed.
*
* @param requestMetadata the request parameters for the provided topic.
* @param clusterState the current state of the cluster
* @throws PolicyViolationException if the request parameters do not satisfy this policy.
*/
void validateDeleteRecords(DeleteRecordsRequest requestMetadata, ClusterState clusterState) throws PolicyViolationException;
}
/**
* Represents the state of a broker
*/
interface BrokerState {
/**
* The broker config.
*/
Map<String,String> configs();
}
interface RequestedBrokerState extends BrokerState {
/**
* The topic config as it will be if the request is successful.
* This is effectively the same as the value of {@code configs}
* after the following computation:
* <pre><code>
* Map<String, String> configs = currentState.configs();
* configs.putAll(requestedState.requestedConfigs();
* </code></pre>
*/
@Override
Map<String,String> configs();
/**
* The broker configs present in the request.
*/
Map<String,String> requestedConfigs();
}
/**
* A policy that is enforced on broker alteration.
*
* An implementation of this policy can be configured on a broker via the
* {@code broker.management.policy.class.name} broker config.
* When this is configured the named class will be instantiated reflectively
* using its nullary constructor and will then pass the broker configs to
* its <code>configure()</code> method. During broker shutdown, the
* <code>close()</code> method will be invoked so that resources can be
* released (if necessary).
*
* TODO: Fully define the lifecycle since the policy is configured by broker config which changes, so a means of reconfiguration is required.
*/
interface BrokerManagementPolicy extends Configurable, AutoCloseable {
static interface AbstractRequestMetadata {
/**
* The id of the broker the action is being performed upon.
* This is always the same as the id of the broker in which the
* broker management policy is executing.
*/
public int brokerId();
/**
* The principal making the request.
*/
public KafkaPrincipal principal();
}
static interface AlterBrokerRequest extends AbstractRequestMetadata {
/**
* The requested state of the broker to be altered.
*/
public RequestedBrokerState requestedState();
}
/**
* Validate the given request to alter a broker
* and throw a <code>PolicyViolationException</code> with a suitable error
* message if the request does not satisfy this policy.
*
* Clients will receive the POLICY_VIOLATION error code along with the exception's message.
* Note that validation failure only affects the relevant broker,
* other topics in the request will still be processed.
*
* @param requestMetadata the request parameters for the provided broker.
* @param clusterState the current state of the cluster
* @throws PolicyViolationException if the request parameters do not satisfy this policy.
*/
void validateAlterBroker(AlterBrokerRequest requestMetadata,
ClusterState clusterState)
throws PolicyViolationException;
} |
The TopicManagementPolicy
will be applied:
- On topic creation, i.e. when processing a
CreateTopicsRequest
- On topic modification
- Change in topic config, ie. when processing
AlterConfigsRequest
, for topic configs (this change done as part of this KIP). - Adding partitions to topics, i.e. when processing a
CreatePartitionsRequest
(see KIP-195, but this change done as part of this KIP) - Reassigning partitions to brokers, and/or changing the replication factor when processing
ReassignPartitionsRequest
(as part of KIP-179)
- Change in topic config, ie. when processing
- On topic deletion, i.e. when processing a
DeleteTopicsRequest
(this change done as part of this KIP). - On message deletion, i.e. when processing a
DeleteRecordsRequest
(this change done as part of this KIP).
The BrokerManagementPolicy
will be applied:
- On broker startup
- This is to ensure that brokers start in a valid state; without this it would be possible for a later alter broker request to be denied even though the request itself was not the cause of the policy violation.
- On broker modification, ie. when processing a
AlterConfigsRequest
for broker configs.
Deprecate existing policies
The existing CreateTopicPolicy
and AlterConfigPolicy
will be deprecated, but will continue to be applied when they are configured.
Using create.topic.policy.class.name
or alter.config.policy.class.name
will result in an deprecation warning in the broker logs.
It will be a configuration-time error if both create.topic.policy.class.name
and topic.management.policy.class.name
are used at the same time, or both alter.config.policy.class.name
and topic.
are used at the same time.management
.policy.class.name
Internally, an adapter implementation of TopicManagementPolicy
will be used when CreateTopicPolicy
and AlterConfigPolicy
are configured, so policy use sites won't be unnecessarily complicated.
If, in the future, AdminClient.alterConfigs()
/AlterConfigsRequest
is changed to support changing broker configs a separate policy interface can be applied to such changes.
Add new versions of DeleteTopicsRequest
and DeleteTopicsResponse
The DELETE_TOPICS
protocol have a 3rd version added (version 2). The DeleteTopicsRequest
will get a validate_only
flag. When this is set the request will be validated for correctness, including that it satisfies the TopicManagementPolicy.validateDeleteTopic()
method, but the topic won't actually be deleted.
No Format |
---|
DeleteTopics Request (Version: 2) => [topics] timeout validate_only
topics => STRING
timeout => INT32
validate_only => BOOLEAN |
The DeleteTopicsResponse
will get the ability to include error messages in addition to error codes:
No Format |
---|
DeleteTopics Response (Version: 2) => throttle_time_ms [topic_error_codes]
throttle_time_ms => INT32
topic_error_codes => topic error_code error_message
topic => STRING
error_code => INT16
error_message => NULLABLE_STRING |
Old versions of the DeleteTopicsResponse
will use a UNEXPECTED_SERVER_ERROR error_code
instead of POLICY_VIOLATION so as to not break clients.
The documentation for AdminClient.deleteTopics()
will be updated mention the possibility of PolicyViolationException
from the DeleteTopicsResult
methods.
Add new versions of DeleteRecordsRequest
and DeleteRecordsResponse
The DELETE_RECORDS
protocol have a 2nd version added (version 1). The DeleteRecordsRequest
will get a validate_only
flag. When this is set the request will be validated for correctness, including that it satisfies the TopicManagementPolicy.validateDeleteRecords()
method, but no records will be deleted.
No Format |
---|
DeleteRecords Request (Version: 1) => [topics] timeout validate_only
topics => topic [partitions]
topic => STRING
partitions => partition offset
partition => INT32
offset => INT64
timeout => INT32
validate_only => BOOLEAN |
The DeleteRecordsResponse
will get the ability to include error messages in addition to error codes:
No Format |
---|
DeleteRecords Response (Version: 0) => throttle_time_ms [topics]
throttle_time_ms => INT32
topics => topic [partitions]
topic => STRING
partitions => partition low_watermark error_code error_message
partition => INT32
low_watermark => INT64
error_code => INT16
error_message => NULLABLE_STRING |
Existing versions of the DeleteRecordsResponse
will use a UNEXPECTED_SERVER_ERROR error_code
instead of POLICY_VIOLATION so as to not break clients.
The documentation for AdminClient.deleteRecords()
(being added by KIP-204) will be updated mention the possibility of PolicyViolationException
from the DeleteRecordsResult
methods.
Compatibility, Deprecation, and Migration Plan
Existing users will have to reimplement their policies in terms of the new TopicManagementPolicy
interface, and reconfigure their brokers accordingly. Since the TopicManagementPolicy
contains a superset of the existing information used by the deprecated policies such reimplementation should be trivial.
The deprecated policy interfaces and configuration keys will be removed in a future Kafka version. If this KIP is accepted for Kafka 1.1.0 this removal could happen in Kafka 2.0.0 or a later major release.
This KIP proposes to retrospectively apply policies to two APIs (delete topics and delete records) exposed via the network protocol. Existing clients might not be expecting a POLICY_VIOLATION error code in the responses. To mitigate this, only the new version of these network protocols will actually return POLICY_VIOLATION. If a client is using an old version of either of these protocols, the policy violation will be returned an UNEXPECTED_SERVER_ERROR, and a message logged to explain that the error is in fact a policy violationThe deprecated policy interfaces and configuration keys will be removed in a future Kafka version. If this KIP is accepted for Kafka 1.1.0 this removal could happen in Kafka 2.0.0 or a later major release.
Rejected Alternatives
The objectives of this KIP could be achieved without deprecating the existing policy classes, but that:
...
Having separate policy interfaces for creation/modification and deletion and retaining the existing single-method-per-policy-interface design was considered, but rejected because it was a half way house between having multiple policies and having a single policy.