Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Kafka RPC protocol has its own serialization format for binary data.  This format is also used for messages on disk, and for metadata such as consumer offsets.  In this strongly typed format, each message has a predefined schema which both senders and receivers must understand in order for communication to take place.

In order to support evolving the protocol over time, messages have a 15-bit version number.  Each distinct version of a message has a distinct schema.  So, for example, the schema for version 3 of FetchRequest may contain a different set of fields than the schema for version 2.

While this versioning scheme allows us to change the message schemas currently supports a single version number per message type.  This number determines which version of the schema should be used when writing or reading the message.  While this versioning scheme gives us the flexibility to add new fields to the schema over time, there are many scenarios that it doesn't support well.One   One scenario that isn't well-supported is when we want to add have data that should be included sent in some contexts, but not others.  For example, when a MetadataRequest is made with IncludeClusterAuthorizedOperations set to true, we need to include the authorized operations in the response.  However, even when IncludeClusterAuthorizedOperations is set to false, we still must use waste bandwidth sending the authorized operations fields in the response.  The assumption is that when a new version adds a field, that field will always be used going forward.  This leads to cases where new features that are included in the request or response impose a CPU and network cost even if you don't use them.  Even when a field is set to null, it can take up a fairly large amount of space when serialized.  For example, a null array takes 4 bytes to serialize.

Another scenario is when we want to attach that we don't support is attaching an extra field to a message in a manner that is orthogonal to the normal versioning scheme.  For example, we might want to attach a trace ID, a "forwarded-by" field, or a "user-agent" field.  It wouldn't make sense to add all these fields to the message schema on the off chance that someone might use them.  In order to support these scenarios, we would like to add the concept of "tagged fields" to the Kafka protocol.

Finally, sometimes, we want to add extra information to a message without requiring a version bump for everyone who has to read that message.  This is particularly important for metadata like consumer offsets.

Proposed Changes

Tagged Fields

We propose to add tagged fields to the Kafka serialization format.  Each tagged field will be identified by its 15-bit numeric tag.

Tagged fields are always optional.  When they are not present in a message, they do not take up any space.

A new tagged field can be added to an existing protocol version without bumping the protocol version.  If the receiver does not expect a particular tagged field, it will simply skip over the field without deserializing it.

More Efficient Serialization for Variable-Length Objects

Kafka RPC supports variable length strings, byte buffers, and arrays.  In each of these cases, the object is serialized as a fixed-length size, followed by the data.

Since these objects are usually small, this serialization format is not very efficient.  For example, most arrays do not have more than 100 elements.  However, arrays are prefixed by a four-byte length that could theoretically denote a size up to 2 billion. 

Instead, we should use a variable-length integer that can take between 1 and 5 bytes, depending on the length.  In the common case when the array is small, using variable-length sizes will let us save three bytes per array, three bytes per byte buffer, and one byte per string.

Flexible Versions


Public Interfaces

JSON Schemas

...

flexibleVersions

Each Kafka RPC will have a new top-level version field named "versionsWithOptionalflexibleVersions".  This field will contain a version range such as "1+", etc.  All of the message versions in this range will support optional tagged fields.

As part of this KIP, we will create a new version of all the existing RPCs.  This new version will support optional fields.

Specifying Optional Fields

  Messages versions outside this range will not support tagged fields.  Note that adding support for tagged versions to an RPC requires bumping the protocol version number.

Specifying Tagged Fields

Tagged Optional fields can appear at the top level of a message, or inside any structure.

Each optional field has a positive 16-bit tag number. This number must be unique within the context it appears in.  (Note that different arrays are considered different contexts for this purpose.)  Any value from 0 to 65535, inclusive, is a legal tag.Unlike mandatory fields, optional fields do not specify a version range or a nullable version range.  Instead, optional fields are nullable if "nullable" is set to true.  Just like with mandatory fields, only certain types of field can be nullable (array, string, bytes, etc.)  Int and booleans cannot be nullable (just like with mandatory fields.)For example, we could use the tag number "2" both at the top level and within a particular substructure without creating ambiguity, since the contexts are separate.

Optional fields can have any type, except for an array of structuresthat they cannot be arrays.  Note that the restriction against having tagged arrays is just to simplify serialization.  We can relax this restriction in the future without changing the protocol on the wire.

Here is an example of a message spec which has optional tagged fields at both the top level and the array level:

Code Block
languagejs
{
  "apiKey": 9000,
  "type": "response",
  "name": "FooResponse",
  "validVersions": "0-9",
  "versionsWithOptionalflexibleVersions": "9+",
  "optionalFields": [
      { "name": "UserAgent", "type": "string", "nullable": true, "tag": "0x0001"1,
        "about": "The user-agent that sent this request." },
  ],
  "fields": [
    { "name": "Foos", "type": "[]Foo", "versions": "0+",
      "about": "Each foo.", "optionalFields": [
        { "name": "Bar", "type": "string", "nullable": false, "tag": "0x0001"1,
          "default": "hello world", "about": "The bar associated with this foo, if any." },
      ], "fields": [
        { "name": "Baz", "type": "int16", "versions": "0+",
          "about": "The baz associated with this foo." },
  ...
  ]
}

Schema Class

We will add a new constructor to the org.apache.kafka.common.protocol.types.Schema class which will support optional fields.

Code Block
languagejava
    /**
     * Construct the schema with a given list of its field values
     *
     * @param optionalFieldstaggedFields     The optionaltagged fields for this schema.
     * @param fields           The mandatory fields of this schema.
     *
     * @throws SchemaException If the given list have duplicate fields
     */
    public Schema(Map<Short, Field> optionalFieldstaggedFields, Field... fields);

When this overload is used, even if the provided map is empty, we will allow the given structure to contain tagged fields.  If a different constructor is used, tagged fields will not be supported in the given structure.  In a flexible version, the overload which supports tagged fields must be used for all Schema objects.

Tagged Fields and Version Compatibility

A tagged field can be retroactively added to an existing message version without breaking compatibility-- provided, of course, that the version in question was a "flexible version."  We cannot add any tagged fields to a inflexible version, and we cannot retroactively change an inflexible version to a flexible one.

Tag numbers must never be reused, nor can we alter the format of a tagged field.  This includes changing a nullable field to a non-nullable one, or vice versa.  When you create the tagged field, you must decide if it will be nullable or not, and stick with that decision forever.

A field can be specified as tagged in some versions and non-tagged in others.  The main use-case for this is to gracefully migrate fields which were previously mandatory to tagged fields, where appropriate.

For convenience, if a field is specified as having a tag, we will assume by default that the tag can appear in all flexible versions.  Therefore, it isn't usually required to specify "versions" or "taggedVersions."  If "taggedVersions" does appear, then it must be a subset of "versions," which must also be specified.

Proposed Changes

Serializing

...

Tagged Fields

In a flexible schema version, each structure begins with a "tagged fields buffer."  This buffer contains all of the tagged fields within the structure.  

The tagged fields buffer starts with a one-byte header with the constant value 0x80.  Then, a series of tagged fields follows.  An "optional field buffer" contains a sequence of optional fields.  The fields must appear in ascending order, from the lowest-valued tag to the highest-valued tag.

If the tag buffer is empty, no bytes at all are written; there is zero overhead.  We can tell if the tag buffer is present or not by reading the first two bytes.  If the first byte we read is not 0x80, then the tag buffer is not present, and we should move on to deserializing the first mandatory field.


is 0x80 and the second byte is not 0x80, then the tag 




Of course, when the tag buffer is empty, the first thing we read will be whatever comes next in the byte stream.  Therefore, there must be no ambiguity between the next field and 


Clearly, we need some way to distinguish 


Because zero-length tagged buffers are serialized as the empty 


  There is no 


Buffer Start Byte

The buffer 

The tag buffer begins with a literal 0x80 byte.  Then, there is a sequence of tags.  Each tag is serialized in three parts.

Length / HasNext Field

The length field contains both the length, and a bit indicating whether this is the last tag.  These two pieces of information are combined together into a single number, via the following expression: (length << 2) | (hasNext << 1) | 0x1, and then serialized as a varint.


The 

Image Added

Code Block
languagetext
titleThe header byte
NTTTTBBB
|  |  |
|  |  +--- B: The number of bytes to use for the tag / length field, minus 1.
|  |        So 000 -> 1 byte, 001 -> 2 bytes, ... 111 -> 8 bytes
|  |
|  +------ T: The number of bits to use for the tag.  This cannot be 0.
|
+--------- N: The next bit.  0 = this is the last entry.  1 = there are more entries.

N: 1 if there is a next entry
T: The number of bits used for the tag (cannot be 0)
B: The number of bytes 

empty tag buffer → zero length output


We do not want tagged fields to take up any space the wire when they are not in use.


Therefore, if there are no tagged fields within the structure, the tagged fields buffer is 


In order to support that,



An "optional field buffer" contains a sequence of optional fields. 

Each entry in the buffer contains a field length, followed by a two-byte tag, followed by the field itself.

...

The sequence of optional fields is terminated by an entry with a field length of 0.  The terminating entry will not contain a tag or value.

Requests and Responses

All requests and responses will begin with an optional field buffer.  If there are no optional fields, this will only be a single zero byte.

Structures

All structures will begin with an optional field buffer.  This will normally only be a single byte, unless there are optional fields present.

Compatibility, Deprecation, and Migration Plan

As mentioned earlier, existing request versions will not be changed to support optional fields.  However, new versions will have this support going forward.

In general, adding or removing an optional field is always a compatible operation, provided that we don't reuse a tag that was used for something else in a previous release.  Changing the type or nullability of an existing optional field is also an incompatible change.

Rejected Alternatives

Optional Field Buffer Serialization Alternatives

  • We could serialize optional fields as a tag and a type, rather than a tag and a length.  However, this would prevent us from adding new types in the future, since the old deserializers would not understand them.
  • We could allow the serialization of arrays of objects.  However, this would require a two-pass serialization rather than a single-pass serialization.  The first pass would have to cache the lengths of all the optional object arrays.  We might support this eventually, but for now, it doesn't seem necessary.  We can add it later in a compatible fashion if we decide to.

Make all Fields Optional

Rather than supporting both mandatory and optional fields, we could make all fields optional.  For fields which we always expect to use, however, this would take more space when serialized.  There are also situations where it is useful for the recipient to know which features the sender supports, and the mandatory field mechanism handles these situations well.