Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

While this versioning scheme allows us to change the message schemas over time, there are many scenarios that it doesn't support well.  One scenario that isn't well-supported is when we have data that should be sent in some contexts, but not others.  For example, when a MetadataRequest is made with IncludeClusterAuthorizedOperations set to true, we need to include the authorized operations in the response.  However, even when IncludeClusterAuthorizedOperations is set to false, we still must waste bandwidth sending the a set of blank authorized operations fields in the response.  The assumption problem is that when a new version adds a field, that field will always be used going forward.  This leads to cases where new features that are included in the request or response impose a CPU and network cost even if you don't use them.  Even when a field is set to null, it can take up a fairly large amount of space when serializedthe field that is semantically optional in the message, but that is can't be expressed in the type system for the Kafka RPC protocol.

Another scenario that we don't support is attaching an extra field to a message in a manner that is orthogonal to the normal versioning scheme.  For example, we might want to attach a trace ID, a "forwarded-by" field, or a "user-agent" field.  It wouldn't make sense to add all these fields to the message schema on the off chance that someone might use them.  In order to support these scenarios, we would like to add the concept of "tagged fields" to the Kafka protocol.

...

We propose to add tagged fields to the Kafka serialization format.  Each tagged field will be identified by its 1531-bit numeric tag.

Tagged fields are always optional.  When they are not present in a message, they do not take up any space.

...

Since these objects are usually small, this serialization format is not very efficientthe size field almost always ends up taking up much more space than is needed.  For example, most arrays do not have more than 100 elements.  However, arrays are every array is currently prefixed by a four-byte length that could theoretically denote a size up to 2 billion. 

InsteadRather than using a fixed-length size, we should use a use a variable-length integer that can take between 1 and 5 bytes, depending on the length.  In the common case when the array is small, using variable-length sizes will let us save three bytes per array, three bytes per byte buffer, and one byte per string.

Flexible Versions

It would be tedious to update the JSON message specifications to add tagged fields to each structure.  Similarly, we don't wan to manually annotate each string, buffer, or array that should now be serialized in a more efficient way.  Instead, we should simply have the concept of "flexible versions." Any version of a message that is a "flexible version" has the changes described above.

Public Interfaces

JSON Schemas

flexibleVersions

The flexible versions will be described by a new top-level field in each request and response.  The format will be the same as that of existing version fields.  If the flexible versions are not specified, it is assumed that all versions are flexible.

Each Kafka RPC will have a new top-level version field named "flexibleVersions".  This field will contain a version range such as "1+", etc.  All of the message versions in this range will support tagged fields.  Messages versions outside this range will not support tagged fields.  Note that adding support for tagged versions to an RPC requires bumping the protocol version number.

...

Each optional field has a positive 1631-bit tag number. This number must be unique within the context it appears in.  For example, we could use the tag number "21" both at the top level and within a particular substructure without creating ambiguity, since the contexts are separate.

...

Code Block
languagejs
{
  "apiKey": 9000,
  "type": "response",
  "name": "FooResponse",
  "validVersions": "0-9",
  "flexibleVersions": "9+",
  "optionalFields": [
      { "name": "UserAgent", "type": "string", "tag": 10,
        "about": "The user-agent that sent this request." },
  ],
  "fields": [
    { "name": "Foos", "type": "[]Foo", "versions": "0+",
      "about": "Each foo.", "optionalFields": [
        { "name": "Bar", "type": "string", "tag": 10,
          "default": "hello world", "about": "The bar associated with this foo, if any." },
      ], "fields": [
        { "name": "Baz", "type": "int16", "versions": "0+",
          "about": "The baz associated with this foo." },
  ...
  ]
}

...

Type Classes

We will add a new constructor to the several new subclasses of org.apache.kafka.common.protocol.types.Schema class which will support optional fields.

Code Block
languagejava
    /**
     * Construct the schema with a given list of its field values
     *
     * @param taggedFields     The tagged fields for this schema.
     * @param fields           The mandatory fields of this schema.
     *
     * @throws SchemaException If the given list have duplicate fields
     */
    public Schema(Map<Short, Field> taggedFields, Field... fields);

...

.Type and org.apache.kafka.common.protocol.types.Field.

Type Class NameField Class NameDescription
CompactArrayOfCompactArrayRepresents an array whose length is expressed as a variable-length integer rather than a fixed 4-byte length.
COMPACT_STRINGCompactStringRepresents a string whose length is expressed as a variable-length integer rather than a fixed 2-byte length.
COMPACT_NULLABLE_STRINGCompactNullableStringRepresents a nullable string whose length is expressed as a variable-length integer rather than a fixed 2-byte length.
COMPACT_BYTESCompactBytesRepresents a byte buffer whose length is expressed as a variable-length integer rather than a fixed 4-byte length.
COMPACT_NULLABLE_BYTESCompactNullableBytesRepresents a nullable byte buffer whose length is expressed as a variable-length integer rather than a fixed 4-byte length.
TagSectionTagSectionRepresents a section containing optional tagged fields.

Tagged Fields and Version Compatibility

...

For convenience, if a field is specified as having a tag, we will assume by default that the tag can appear in all flexible versions.  Therefore, it isn't usually required to specify "versions" or "taggedVersions."  If "taggedVersions" does appear, then it must be a subset of "versions," which must also be specified.

Proposed Changes

...

Serialization

Tag Sections

In a flexible schema version, each structure begins with a "tagged fields buffer."tag section.  This buffer contains section stores all of the tagged fields within in the structure.  

The tag section begins with a number of tagged fields buffer starts with a one-byte header with the constant value 0x80.  Then, a series of tagged fields follows.  The fields must appear in ascending order, from the lowest-valued tag to the highest-valued tag.

If the tag buffer is empty, no bytes at all are written; there is zero overhead.  We can tell if the tag buffer is present or not by reading the first two bytes.  If the first byte we read is not 0x80, then the tag buffer is not present, and we should move on to deserializing the first mandatory field.

is 0x80 and the second byte is not 0x80, then the tag 

Of course, when the tag buffer is empty, the first thing we read will be whatever comes next in the byte stream.  Therefore, there must be no ambiguity between the next field and 

Clearly, we need some way to distinguish 

Because zero-length tagged buffers are serialized as the empty 

  There is no 

Buffer Start Byte

The buffer 

The tag buffer begins with a literal 0x80 byte.  Then, there is a sequence of tags.  Each tag is serialized in three parts.

Length / HasNext Field

The length field contains both the length, and a bit indicating whether this is the last tag.  These two pieces of information are combined together into a single number, via the following expression: (length << 2) | (hasNext << 1) | 0x1, and then serialized as a varint.

The 

Image Removed

Code Block
languagetext
titleThe header byte
NTTTTBBB
|  |  |
|  |  +--- B: The number of bytes to use for the tag / length field, minus 1.
|  |        So 000 -> 1 byte, 001 -> 2 bytes, ... 111 -> 8 bytes
|  |
|  +------ T: The number of bits to use for the tag.  This cannot be 0.
|
+--------- N: The next bit.  0 = this is the last entry.  1 = there are more entries.

N: 1 if there is a next entry
T: The number of bits used for the tag (cannot be 0)
B: The number of bytes 

empty tag buffer → zero length output

We do not want tagged fields to take up any space the wire when they are not in use.

Therefore, if there are no tagged fields within the structure, the tagged fields buffer is 

In order to support that,

An "optional field buffer" contains a sequence of optional fields. 

Each entry in the buffer contains a field length, followed by a two-byte tag, followed by the field itself.

...

, serialized as a variable-length integer.  If this number is 0, there are no tagged fields present.  In that case, the tag section takes up only one byte.

If the number of tagged fields is greater than zero, the tagged fields follow.  They are serialized in ascending order of their tag.  Each tagged field begins with a tag header, serialized as a variable-length integer.  After the tag header, the field data follows.

The number of tagged fieldsTag Header 1Tag Data 1Tag Header 2Tag Data 2...
UNSIGNED_VARINTUNSIGNED_VARLONG<field 1 type>UNSIGNED_VARLONG<field 2 type>...

Tag Headers

The tag header is a 64-bit integer containing the 32-bit tag and the 32-bit field length of the tagged field.  The bits for these fields are interleaved: the even-indexed bits correspond to the length bits, and the odd-indexed bits correspond to the tag bits.

To give an example, let's say that the length was 4 and the tag was 5.  In binary, these numbers would be 0b100 and 0b101, respectively.  Then the end of the varlong would be:

...T2L2T1L1T0L0
...110010

The reason for interleaving the bits is that in the common case where both numbers are small, we want the varlong to take up as few bytes as possible.

Compact Arrays

A compact array contains a 32-bit unsigned varint, followed by the array elements.

32-bit length (plus one)Element 0Element 1...
VARINT<array element type><array element type>...

If the length field is 0, the array is null.  If the length field is 1, the length is 0.  If the length field is 2, the length is 1, etc.

Compact Bytes

A compact bytes field contains a 32-bit unsigned varint, followed by the bytes.

32-bit length (plus one)Payload
VARINTBytes

If the length field is 0, the bytes field is null.  If the length field is 1, the length is 0.  If the length field is 2, the length is 1, etc.

Compact String

A compact string field contains a 32-bit unsigned varint, followed by the string bytes.

32-bit length (plus one)String
VARINTBytes

If the length field is 0, the string field is null.  If the length field is 1, the length is 0.  If the length field is 2, the length is 1, etc.

Unsigned Varints

The UNSIGNED_VARINT type describes an unsigned variable length integer.

To serialize a number as a variable-length integer, you break it up into groups of 7 bits.  The lowest 7 bits is written out first, followed by the second-lowest, and so on.  Each time a group of 7 bits is written out, the high bit (bit 8) is cleared if this group is the last one, and set if it is not.

So for, example, let's say we were trying to serialize 300, which is 0b100101100 in binary.  This would be serialized as the following two-byte sequence:

Continuation bitB6B5B4B3B2B1B0Continuation Bit B13B12B11B10B9B8B7
1010110000000010

Unlike signed varints, unsigned varints do not use "zig-zag encoding."  So they cannot efficiently represent negative numbers.

Unsigned Varlongs

The UNSIGNED_VARLONG type is exactly like the UNSIGNED_VARINT type, but it can hold 64 bits instead of just 32The sequence of optional fields is terminated by an entry with a field length of 0.  The terminating entry will not contain a tag or value.

Requests and Responses

All requests and responses will begin with an optional a tagged field buffer.  If there are no optional tagged fields, this will only be a single zero byte.

Structures

All structures will begin with an optional field buffer.  This will normally only be a single byte, unless there are optional fields present.

Compatibility, Deprecation, and Migration Plan

As mentioned earlier, existing request versions will not be changed to support optional fields.  However, new versions will have this support going forward.

In general, adding or removing an optional a tagged field is always a compatible operation.  However, provided that we don't we do not want to reuse a tag that was previously used for something else in a previous release.  Changing the type or nullability of an existing optional field is also an incompatible change.

...