You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: KAFKA-9927

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Kafka protocol already supports variable length encodings for integers. Specifically, KIP-482 added support for using an unsigned variable length integer encoding for the length of variable length data (strings, arrays, bytes) and for integer quantities in tagged fields. However it is currently not possible to use a variable length encoding for regular fields with an integral (short, integer or long) type.

Varints encode two's complement signed ints or longs using a variable number of bytes such that "smaller numbers" require fewer bytes. The trade-off is that larger ints can require up to 5 or 9 bytes (as oposed to the 4 or 8 bytes that a fixed encoding would require).

For 32-bit integers encoded using a "signed" variable length encoding, the histogram of int value to number of encoded bytes looks like this:

  • ints in [-2147483648,-134217727] require 5 bytes
  • ints in [-134217728,-1048575] require 4 bytes
  • ints in [-1048576,-8191] require 3 bytes
  • ints in [-8192,-63] require 2 bytes
  • ints in [-64,63] require 1 byte
  • ints in [64,8191] require 2 bytes
  • ints in [8192,1048575] require 3 bytes
  • ints in [1048576,134217727] require 4 bytes
  • ints in [134217728,2147483647) require 5 bytes

This can be represented as a histogram (with a non-linear x axis):

#bytes
5 |--------                         --------
4 |        ------             ------
3 |              ----     ----
2 |                  -- --
1 |                    -
  +----------------------------------------- value
                       0

For 32-bit integers encoded using a "unsigned" variable length encoding, the histogram of int value to number of encoded bytes looks like this:

  • ints in [0,127] require 1 byte
  • ints in [128,16383] require 2 bytes
  • ints in [16384,2097151] require 3 bytes
  • ints in [2097152,268435455] require 4 bytes
  • ints in [268435456,4294967296) require 5 bytes

Or, as a histogram:

#bytes
5 |                         ----------------
4 |             ------------
3 |     --------
2 | ----
1 |-
  +----------------------------------------- value
  0

Note

It is important to understand that the use of term "signed" or "unsigned" here does not refer to the signed-ness of the Java int or long which is serialized (Java int and long are always signed), but rather to how the encoding is more efficient for a range of numbers which are, roughly, symmetric about zero, or a range whose lower bound is zero.

Numerous existing PRCs use Java shortsints or longs for quantities which have a histogram with a predictable shape

  • Broker ids are usually numbered sequentially from 0, 1 or perhaps 1000, and are typically less than 16383.
  • "Replica ids" and "leader ids" are just broker ids.
  • Partition ids are numbered sequentially from 0, and would typically be less than 16383
  • Error codes are numbered sequentially from -1.

Using variable length encodings for these quantities in Kafka protocol messages would make those message smaller. For some RPCs the messages could be substantially smaller.

An example: MetadataResponse

Taking MetadataResponseData as an example, and looking just at the deeply nested MetadataResponsePartition the current schema is:

{ "name": "ErrorCode", "type": "int16", "versions": "0+",
  "about": "The partition error, or 0 if there was no error." },
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
  "about": "The partition index." },
{ "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
  "about": "The ID of the leader broker." },
{ "name": "LeaderEpoch", "type": "int32", "versions": "7+", "default": "-1", "ignorable": true,
  "about": "The leader epoch of this partition." },
{ "name": "ReplicaNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
  "about": "The set of all nodes that host this partition." },
{ "name": "IsrNodes", "type": "[]int32", "versions": "0+",
  "about": "The set of nodes that are in sync with the leader for this partition." },
{ "name": "OfflineReplicas", "type": "[]int32", "versions": "5+", "ignorable": true,
  "about": "The set of offline replicas of this partition." }


The current fixed length encoding requires

size = 2           // errorCode
     + 4           // partitionIndex
     + 4           // leaderId
     + 4           // leaderEpoch
     + 1           // size of replicaNodes (assuming best case)
     + 4 × replica // replicaNodes
     + 1           // size of isrNodes (assuming best case)
     + 4 × isr     // isrNodes
     + 1           // size of offlineReplicas (assuming best case)
     + 4 × offline // offlineReplicas
     = 17 + 4×(replica + isr + offline) in the best case

If the schema for MetadataResponsePartition used the unsigned variable length encoding for all fields then in the best case we get the formula:

size = 1           // errorCode
     + 1           // partitionIndex
     + 1           // leaderId
     + 1           // leaderEpoch
     + 1           // size of replicaNodes (assuming best case)
     + 1 × replica // replicaNodes
     + 1           // size of isrNodes (assuming best case)
     + 1 × isr     // isrNodes
     + 1           // size of offlineReplicas (assuming best case)
     + 1 × offline // offlineReplicas
     = 7 + R + I + O in the best case

More concretely, benchmarking a MetadataResponse (just the body, excluding the header) containing a single 100 partition topic replicated across two brokers suggests that:

  • Fixed length encoding is 3216 bytes, taking on average 56,458µs to serialize and 14416µs to deserialize.
  • In the best case, variable length encoding is 1170 bytes, taking on average 65226µs to serialize and 14713µs to deserialize.
  • In the worse case, variable length encoding is 4026 bytes, taking on average 81328µs to serialize and 14755µs to deserialize.

The worst case would occur if the cluster had brokers with ids greater than 134,217,727, and for topics with more than that many partitions and where the error code was >255.

Since most of the data in a typical MetadataResponse is partition data, such a change would make typical responses substantially smaller.

Scope

This KIP proposes a mechanism for allowing RPCs (including new versions of existing RPCs) to use varints.
It does not propose any changes to existing RPC messages to make use of the new encoding.
It is envisaged that RPCs will make use of this functionality as those RPCs get changed under other KIPs and guided by benchmarking about the costs and benefits.

Public Interfaces

This could be done in two ways, either by making the existing type property of fields support version-dependent types, or by introducing a separate encoding property.

Making FieldSpec's type version-dependent

The existing type property of fields would be allowed to be either the JSON String type or the JSON object type.
The interpretation of a String-typed property would be that the property has the named type in all versions of the property.
When type was an object the keys would be version ranges and the values would be the type of the property in messages within that range.
Support would be added for new types: varint16, varint32varint64 and unsigned_varint16 etc.
The Java type of the property corresponding to the field spec would be the widest corresponding Java type.
This would to allow, in addition to variable length encodings, for the possibility for 32-bit fields to evolve to 64-bit quantities between message versions.

Example

Focussing specifically on the LeaderId of the MetadataResponsePartition previously described:

{ "name": "LeaderId",
  "type": { 
    "0-9": "int32",
    "10+": "unsigned_varint32" 
  },
  "versions": "0+",
  "entityType": "brokerId",
  "about": "The ID of the leader broker."
}

Alternative: Adding a separate encoding property

Field specs in the protocol message JSON format will get support for a new encoding property, which will define, for each version of the field, how the value should be encoded. This approach makes encoding a first-class concept, separating the logical type of a field from how it is encoded on the wire. While it is more verbose it is potentially more flexible than conflating type and encoding within the `type` property, since it would be easy to add further named encodings in the future.

Example

{ "name": "LeaderId",
  "type": "int32",
  "versions": "0+",
  "entityType": "brokerId",
  "about": "The ID of the leader broker.",
  "encoding": {
    "0-9": "fixed32",
    "10+": "unsigned32"
}}

Proposed Changes

TBC based on selection of the preferred alternative via discussion.

Compatibility, Deprecation, and Migration Plan

The proposal is backwards compatible: Clients using existing API versions will continue to use fixed-size encoding.

New versions of existing RPC messages will be able to use variable length encoding on a per-field basis.

Rejected Alternatives

  • Simply adding support for varint32 types would in its own allow these types to be used for existing fields.


  • No labels