Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Altering quotas only works on a config-centric manner, and therefore doesn't need distinguishing. For a given entity match, the administrator should be able to specify which quotas apply, or alternatively remove existing quotas so they no longer match. This is AlterClientQuotas.

The quota values are of type Double, which presents a complication in that the RPC protocol doesn't support floating point values. To account for this, Double values will be serialized via Double.doubleToLongBits() and deserialized via Double.longBitsToDouble(). Java strongly defines the byte format, so there's no issue about compatibility. 

Types Rationale

While there's two defined entity types in AK, a server-side plugin mechanism allows for further expansion. Likewise, as use cases evolve, finer-grained quota control may be necessary. Therefore, entity types should not be statically bound to publicly defined constants, and instead the API should support flexible entity types by interpreting them as a String identifier. Any entity types that the broker doesn't understand should throw an IllegalArgumentException back to the client.

...

Code Block
languagejava
public class DescribeClientQuotasOptions extends AbstractOptions<DescribeClientQuotasOptions> {
    // Empty.
}

/**
 * The result of the {@link Admin#DescribeClientQuotas(Collection<QuotaFilter>, DescribeClientQuotasOptions)} call.
 */
public class DescribeClientQuotasResult {

    /**
     * Maps an entity to its configured quota value(s). Note if no value is defined for a quota
     * type for that entity's config, then it is not included in the resulting value map.
     *
     * @param entities the collection of entities that matched the filter
     */
    public DescribeClientQuotasResult(KafkaFuture<Map<QuotaEntity, Map<String, Long>>>Double>>> entities);

    /**
     * Returns a map from quota entity to a future which can be used to check the status of the operation.
     */
    public KafkaFuture<Map<QuotaEntity, Map<String, Long>>>Double>>> entities();
}

public interface Admin extends AutoCloseable {
    ...

    /**
     * Describes all entities matching all provided filters (logical AND) that have at least one
     * quota value defined.
     *
     * @param filters filters to apply to matching entities
     * @param options the options to use
     * @return result containing all matching entities
     */
    DescribeClientQuotasResult describeClientQuotas(Collection<QuotaFilter> filters, DescribeClientQuotasOptions options);
}

...

Code Block
languagejava
public class ResolveClientQuotasOptions extends AbstractOptions<ResolveClientQuotasOptions> {
    // Empty.
}

/**
 * The result of the {@link Admin#ResolveClientQuotas(Collection<QuotaEntity>, ResolveClientQuotasOptions)} call.
 */
public class ResolveClientQuotasResult {
    /**
     * Information about a specific quota configuration entry.
     */
    public class Entry {
        /**
         * @param source the entity source for the value
         * @param value the non-null value
         */
        public Entry(QuotaEntity source, LongDouble value);
    }

    /**
     * Information about the value for a quota type.
     *
     * NOTE: We maintain a `Value` class because additional information may be added, e.g.,
     *       a list of overridden entries.
     */
    public class Value {
        /**
         * @param entry the quota entry
         */
        public Value(Entry entry);
    }

    /**
     * Maps a collection of entities to their resolved quota values.
     *
     * @param config the quota configuration for the requested entities
     */
    public ResolveClientQuotasResult(Map<QuotaEntity, KafkaFuture<Map<String, Value>>> config);

    /**
     * Returns a map from quota entity to a future which can be used to check the status of the operation.
     */
    public Map<QuotaEntity, KafkaFuture<Map<String, Value>>> config();

    /**
     * Returns a future which succeeds only if all quota descriptions succeed.
     */
    public KafkaFuture<Void> all();
}

public interface Admin extends AutoCloseable {
    ...

    /**
     * Describes the resolved quotas for the provided entities.
     *
     * @param entities the entities to describe the resolved quotas for
     * @param options the options to use
     * @return the resolved quotas for the entities
     */
    ResolveClientQuotasResult resolveClientQuotas(Collection<QuotaEntity> entities, ResolveClientQuotasOptions options);
}

...

Code Block
languagejava
titleAlterQuotas
public class AlterClientQuotasEntry {
    public class Op {
        /**
         * @param key the quota type to alter
         * @param value if set then the existing value is updated,
         *              otherwise if null, the existing value is cleared
         */
        public Op(String key, LongDouble value);
    }

    /**
     * @param entity the entity whose config will be modified
     * @param ops the alteration to perform - if value is set, then the existing value is updated,
     *            otherwise if null, the existing value is cleared
     */
    public AlterClientQuotasEntry(QuotaEntity entity, Collection<Op> ops);
}

public class AlterClientQuotasOptions extends AbstractOptions<AlterClientQuotasOptions> {
    /**
     * Sets whether the request should be validated without altering the configs.
     */
    public AlterClientQuotasOptions validateOnly(boolean validateOnly);
}

/**
 * The result of the {@link Admin#AlterClientQuotas(Collection<AlterClientQuotasEntry>, AlterClientQuotasOptions)} call.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
public class AlterClientQuotasResult {
    public AlterClientQuotasResult(Map<QuotaEntity, KafkaFuture<Void>> futures);

    /**
     * Returns a map from quota entity to a future which can be used to check the status of the operation.
     */
    public Map<QuotaEntity, KafkaFuture<Void>> values();

    /**
     * Returns a future which succeeds only if all quota alterations succeed.
     */
    public KafkaFuture<Void> all();
}

public interface Admin extends AutoCloseable {
    ...

    /**
     * Alters the quotas as specified for the entries.
     *
     * @param alterations the alterations to perform
     * @return the result of the alterations
     */
    AlterClientQuotasResult alterClientQuotas(Collection<AlterClientQuotasEntry> entries, AlterClientQuotasOptions options);
}

...