...
Code Block | ||
---|---|---|
| ||
@InterfaceStability.Evolving public class AlterConfigOp { public enum OpType { SET((byte) 0), DELETE((byte) 1), APPEND((byte) 2), SUBTRACT((byte) 3); private static final Map<Byte, OpType> OP_TYPES = Collections.unmodifiableMap( Arrays.stream(values()).collect(Collectors.toMap(OpType::id, Function.identity())) ); private final byte id; OpType(final byte id) { this.id = id; } public byte id() { return id; } public static OpType forId(final byte id) { return OP_TYPES.get(id); } } private final ConfigEntry configEntry; private final OpType opType; public AlterConfigOp(ConfigEntry configEntry, OpType operationType) { this.configEntry = configEntry; this.opType = operationType; } public ConfigEntry configEntry() { return configEntry; }; public OpType opType() { return opType; }; } public IncrementalAlterConfigsResultAlterConfigsResult incrementalAlterConfigs( Map<ConfigResource, Collection<AlterConfigOp>> configs, final IncrementalAlterConfigsOptions options); |
Code Block | ||
---|---|---|
| ||
@InterfaceStability.Evolving
public class IncrementalAlterConfigsResult {
private final Map<ConfigResource, KafkaFuture<Void>> futures;
IncrementalAlterConfigsResult(Map<ConfigResource, KafkaFuture<Void>> futures) {
this.futures = futures;
}
/**
* Return a map from resources to futures which can be used to check the status of the operation on each resource.
*/
public Map<ConfigResource, KafkaFuture<Void>> values() {
return futures;
}
/**
* Return a future which succeeds only if all the alter configs operations succeed.
*/
public KafkaFuture<Void> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
}
} |
Code Block | ||
---|---|---|
| ||
@InterfaceStability.Evolving public class IncrementalAlterConfigsOptions extends AbstractOptions<IncrementalAlterConfigsOptions> { private boolean validateOnly = false; /** * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the * AdminClient should be used. */ public IncrementalAlterConfigsOptions timeoutMs(Integer timeoutMs) { this.timeoutMs = timeoutMs; return this; } /** * Return true if the request should be validated without altering the configs. */ public boolean shouldValidateOnly() { return validateOnly; } /** * Set to true if the request should be validated without altering the configs. */ public IncrementalAlterConfigsOptions validateOnly(boolean validateOnly) { this.validateOnly = validateOnly; return this; } }AlterConfigsOptions options); |
Proposed Changes
The new IncrementalAlterConfigs API in AdminClient will take a collection of operations describing the configuration modifications. There are four types of operations.
- Set: set a configuration to a value. The value must not be null.
- Delete: delete a configuration key
- Append: if a configuration key is a list of values, add to the list.
- Subtract: if a configuration key is a list of values, subtract from the list.
...
We will use existing AlterConfigsOptions, AlterConfigsResult API classes to pass the API config options and to return the result of a specific configuration resource modifications
...
Similar to AlterConfigsOptions, IncrementalAlterConfigsOptions will include a timeout value and a dry-run flag.
Similar to existing alterConfigs API, we will to keep the "transactionality" of updating several configs for the same ConfigResource at once. We guarantee that we never do a partial update of a collection of configs
for a ConfigResource from a single request. On validation/update error, we will return the error for the ConfigResource.
Protocol APIs
There will be a new IncrementalAlterConfigsRequest Incremental AlterConfigsRequest.
Code Block | ||||
---|---|---|---|---|
| ||||
IncrementalAlterConfigsOp => INT8 0: SET 1: REMOVE 2: APPEND 3: SUBTRACT IncrementalAlterConfigsRequest (Version: 0) => [resources] validate_only validate_only => BOOLEAN resources => resource_type resource_name [configs] resource_type => INT8 resource_name => STRING configs => config_name config_op config_value config_name => STRING config_op => INT8 config_value => NULLABLE_STRING |
The IncrementalAlterConfigsResponse Incremental AlterConfigsResponse is the same as the AlterConfigsResponse.
...