Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
titlemodifyConfigs
@InterfaceStability.Evolving
public class AlterConfigOp {

    public enum OpType {
        SET((byte) 0), DELETE((byte) 1), APPEND((byte) 2), SUBTRACT((byte) 3);

        private static final Map<Byte, OpType> OP_TYPES = Collections.unmodifiableMap(
                Arrays.stream(values()).collect(Collectors.toMap(OpType::id, Function.identity()))
        );

        private final byte id;

        OpType(final byte id) {
            this.id = id;
        }

        public byte id() {
            return id;
        }

        public static OpType forId(final byte id) {
            return OP_TYPES.get(id);
        }
    }

    private final ConfigEntry configEntry;
    private final OpType opType;

    public AlterConfigOp(ConfigEntry configEntry, OpType operationType) {
        this.configEntry = configEntry;
        this.opType =  operationType;
    }

    public ConfigEntry configEntry() {
        return configEntry;
    };

    public OpType opType() {
        return opType;
    };
}


public IncrementalAlterConfigsResultAlterConfigsResult incrementalAlterConfigs(
		Map<ConfigResource, Collection<AlterConfigOp>> configs,
        final IncrementalAlterConfigsOptions options);
Code Block
titleModifyConfigsResult
@InterfaceStability.Evolving
public class IncrementalAlterConfigsResult {
    private final Map<ConfigResource, KafkaFuture<Void>> futures;

    IncrementalAlterConfigsResult(Map<ConfigResource, KafkaFuture<Void>> futures) {
        this.futures = futures;
    }

    /**
     * Return a map from resources to futures which can be used to check the status of the operation on each resource.
     */
    public Map<ConfigResource, KafkaFuture<Void>> values() {
        return futures;
    }

    /**
     * Return a future which succeeds only if all the alter configs operations succeed.
     */
    public KafkaFuture<Void> all() {
        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
    }
}
Code Block
titleDeserializer
@InterfaceStability.Evolving
public class IncrementalAlterConfigsOptions extends AbstractOptions<IncrementalAlterConfigsOptions> {
    private boolean validateOnly = false;

    /**
     * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
     * AdminClient should be used.
     */
    public IncrementalAlterConfigsOptions timeoutMs(Integer timeoutMs) {
        this.timeoutMs = timeoutMs;
        return this;
    }

    /**
     * Return true if the request should be validated without altering the configs.
     */
    public boolean shouldValidateOnly() {
        return validateOnly;
    }

    /**
     * Set to true if the request should be validated without altering the configs.
     */
    public IncrementalAlterConfigsOptions validateOnly(boolean validateOnly) {
        this.validateOnly = validateOnly;
        return this;
    }
}AlterConfigsOptions options);

Proposed Changes

The new IncrementalAlterConfigs API in AdminClient will take a collection of operations describing the configuration modifications.  There are four types of operations.

  • Set: set a configuration to a value.  The value must not be null.
  • Delete: delete a configuration key
  • Append: if a configuration key is a list of values, add to the list.
  • Subtract: if a configuration key is a list of values, subtract from the list.

...

We will use existing AlterConfigsOptionsAlterConfigsResult API classes to pass the  API config options and to return the result of a specific configuration resource modifications

...

Similar to AlterConfigsOptions, IncrementalAlterConfigsOptions will include a timeout value and a dry-run flag.

Similar to existing alterConfigs API, we will to keep the "transactionality" of updating several configs  for the same ConfigResource at once. We guarantee that we never do a partial update of a collection of configs
for a ConfigResource from a single request. On validation/update error, we will return the error for the ConfigResource.

Protocol APIs

There will be a new IncrementalAlterConfigsRequest Incremental AlterConfigsRequest.

Code Block
languagejava
titleModifyConfigsRequest
IncrementalAlterConfigsOp => INT8
0: SET
1: REMOVE
2: APPEND
3: SUBTRACT

IncrementalAlterConfigsRequest (Version: 0) => [resources] validate_only
 validate_only => BOOLEAN
 resources => resource_type resource_name [configs]
 resource_type => INT8
 resource_name => STRING
 configs => config_name config_op config_value
 config_name => STRING
 config_op => INT8
 config_value => NULLABLE_STRING

The IncrementalAlterConfigsResponse Incremental AlterConfigsResponse is the same as the AlterConfigsResponse.

...