Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The quota values are of type Double, which presents a complication in that the RPC protocol doesn't support floating point values. To account for accommodate this, Double values will be serialized RPC protocol message type 'double' will be added, which will serialize doubles according to the IEEE 754 floating-point "double format" bit layout in their respective field. In Java, this can be done via Double.doubleToRawLongBits() and back via Double.longBitsToDouble(), which should easily translatable to other languages so portability is not a concern. In a future KIP (KIP-568), it will be proposed to add 'double' values natively to Kafka's RPC message type set.

Types Rationale

While there's two defined entity types in AK, a server-side plugin mechanism allows for further expansion. Likewise, as use cases evolve, finer-grained quota control may be necessary. Therefore, entity types should not be statically bound to publicly defined constants, and instead the API should support flexible entity types by interpreting them as a String identifier. Any entity types that the broker doesn't understand should throw an IllegalArgumentException back to the client.

...

Code Block
{
  "apiKey": 48,
  "type": "request",
  "name": "DescribeClientQuotasRequest",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "Filter", "type": "[]QuotaFilterData", "versions": "0+",
      "about": "Filters to apply to quota entities.", "fields": [
      { "name": "EntityType", "type": "string", "versions": "0+",
        "about": "The entity type that the filter applies to." },
      { "name": "Match", "type": "string", "versions": "0+",
        "about": "The string to match against." }
    ]}
  ]
}

{
  "apiKey": 48,
  "type": "response",
  "name": "DescribeClientQuotasResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Entry", "type": "[]EntryData", "versions": "0+",
      "about": "A result entry.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code, or `0` if the quota description succeeded." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
        "about": "The error message, or `null` if the quota description succeeded." },
      { "name": "Entity", "type": "[]QuotaEntityData", "versions": "0+",
        "about": "The quota entity description.", "fields": [
        { "name": "EntityType", "type": "string", "versions": "0+",
          "about": "The entity type." },
        { "name": "EntityName", "type": "string", "versions": "0+",
          "about": "The entity name." }
      ]},
      { "name": "Type", "type": "string", "versions": "0+",
        "about": "The quota type." },
      { "name": "DoubleValueValue", "type": "int64double", "versions": "0+",
        "about": "The quota value,." in}
 serialized format." }
    ]}
  ]
}


ResolveClientQuotas:

Code Block
{
  "apiKey": 49,
  "type": "request",
  "name": "ResolveClientQuotasRequest",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "Entity", "type": "[]QuotaEntityData", "versions": "0+",
      "about": "The quota entity description.", "fields": [
      { "name": "EntityType", "type": "string", "versions": "0+",
        "about": "The entity type." },
      { "name": "EntityName", "type": "string", "versions": "0+",
        "about": "The entity name." }
    ]},
  ]
}

{
  "apiKey": 49,
  "type": "response",
  "name": "ResolveClientQuotasResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Entry", "type": "[]QuotaEntryData", "versions": "0+",
      "about": "Resolved quota entries.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code, or `0` if the resolved quota description succeeded." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
        "about": "The error message, or `null` if the resolved quota description succeeded." },
      { "name": "QuotaEntity", "type": "[]QuotaEntity", "versions": "0+",
        "about": "Resolved quota entries.", "fields": [
        { "name": "EntityType", "type": "string", "versions": "0+",
          "about": "The entity type." },
        { "name": "EntityName", "type": "string", "versions": "0+",
          "about": "The entity name." }
      ]},
      { "name": "QuotaValues", "type": "[]QuotaValueData", "versions": "0+",
        "about": "Quota configuration values.", "fields": [
        { "name": "Type", "type": "string", "versions": "0+",
          "about": "The quota type." },
        { "name": "Entry", "type": "[]ValueEntryData", "versions": "0+",
          "about": "Quota value entries.", "fields": [
          { "name": "QuotaEntity", "type": "[]ValueQuotaEntity", "versions": "0+",
            "about": "Resolved quota entries.", "fields": [
            { "name": "EntityType", "type": "string", "versions": "0+",
              "about": "The entity type." },
            { "name": "EntityName", "type": "string", "versions": "0+",
              "about": "The entity name." }
          ]},
          { "name": "DooubleValueValue", "type": "int64double", "versions": "0+",
            "about": "The quota configuration value, in serialized format." }
        ]}
      ]}
    ]}
  ]
}


AlterClientQuotas:

Code Block
{
  "apiKey": 50,
  "type": "request",
  "name": "AlterClientQuotasRequest",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "Entry", "type": "[]EntryData", "versions": "0+",
      "about": "The quota configuration entries to alter.", "fields": [
      { "name": "QuotaEntity", "type": "[]QuotaEntity", "versions": "0+",
        "about": "The quota entity to alter.", "fields": [
        { "name": "EntityType", "type": "string", "versions": "0+",
          "about": "The entity type." },
        { "name": "EntityName", "type": "string", "versions": "0+",
          "about": "The name of the entity." }
      ]},
      { "name": "Op", "type": "[]OpData", "versions": "0+",
        "about": "An individual quota configuration entry to alter.", "fields": [
        { "name": "Type", "type": "string", "versions": "0+",
          "about": "The quota type." },
        { "name": "DoubleValueValue", "type": "int64double", "versions": "0+",
          "about": "The value to set, in serialized format, otherwise ignored if the value is to be removed." },
        { "name": "Remove", "type": "bool", "versions": "0+",
          "about": "Whether the quota configuration value should be removed, otherwise set." }
      ]}
    ]},
    { "name": "ValidateOnly", "type": "bool", "versions": "0+",
      "about": "Whether the alteration should be validated, but not performed." }
  ]
}

{
  "apiKey": 50,
  "type": "response",
  "name": "AlterClientQuotasResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Entry", "type": "[]EntryData", "versions": "0+",
      "about": "The quota configuration entries to alter.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code, or `0` if the quota alteration succeeded." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
        "about": "The error message, or `null` if the quota alteration succeeded." },
      { "name": "QuotaEntity", "type": "[]QuotaEntity", "versions": "0+",
        "about": "The quota entity to alter.", "fields": [
        { "name": "EntityType", "type": "string", "versions": "0+",
          "about": "The entity type." },
        { "name": "EntityName", "type": "string", "versions": "0+",
          "about": "The name of the entity." }
      ]}
    ]}
  ]
}

Compatibility, Deprecation, and Migration Plan

Kafka RPC 'double' support:

Note that, while the ByteBuffer natively supports serializing a Double, the format in which the value is serialized is not strongly specified, so the preference is to explicitly ensure a standard representation of double-precision 64-bit IEEE 754 format. This is achieved in Java using Double.doubleToRawLongBits() and Double.longBitsToDouble() and should be easily portable to other languages.

Code Block
titleclients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
    /**
     * Read a double-precision 64-bit format IEEE 754 value.
     *
     * @param buffer The buffer to read from
     * @return The long value read
     */
    public static double readDouble(ByteBuffer buffer) {
        return Double.longBitsToDouble(buffer.getLong());
    }

    /**
     * Write the given double following the double-precision 64-bit format IEEE 754 value into the buffer.
     *
     * @param value The value to write
     * @param buffer The buffer to write to
     */
    public static void writeDouble(double value, ByteBuffer buffer) {
        buffer.putLong(Double.doubleToRawLongBits(value));
    }

The protocol type definition:

Code Block
titleclients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
    public static final DocumentedType DOUBLE = new DocumentedType() {
        @Override
        public void write(ByteBuffer buffer, Object o) {
            ByteUtils.writeDouble((Double) o, buffer);
        }

        @Override
        public Object read(ByteBuffer buffer) {
            return ByteUtils.readDouble(buffer);
        }

        @Override
        public int sizeOf(Object o) {
            return 8;
        }

        @Override
        public String typeName() {
            return "DOUBLE";
        }

        @Override
        public Double validate(Object item) {
            if (item instanceof Double)
                return (Double) item;
            else
                throw new SchemaException(item + " is not a Double.");
        }

        @Override
        public String documentation() {
            return "Represents a double-precision 64-bit format IEEE 754 value. " +
                    "The values are encoded using eight bytes in network byte order (big-endian).";
        }
    };


In generator/src/main/java/org/apache/kafka/message/MessageGenerator.java, the following operations will be used (code omitted for brevity):

Code Block
titlegenerator/src/main/java/org/apache/kafka/message/MessageGenerator.java
Hash code: Double.hashCode(value)

Empty value: (double) 0

Parsing a default value string: Double.parseDouble(defaultValue)

Compatibility, Deprecation, and Migration Plan

All changes are All changes would be forward-compatible, and no migration plan is necessary. It's outside the scope of this KIP to deprecate any functionality.

...