Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-848 introduced a new group coordinator that is able to conduct incremental, cooperative rebalances led by the server. KIP-848 significantly changes how the group coordinator works and therefore is the primary motivator for this KIP. This KIP proposes to lay a generic foundation for both coordinators so that future KIPs can illustrate simple downgrade paths.

Today, we don't have a downgrade story for transaction and group coordinators, and it becomes extremely difficult to downgrade once we add new fields to any of the existing record types used in their respective topics (__transaction_state and __consumer_offsets). When a new field is added, older versions cannot read these records so the only way to downgrade is to reformat these records to older versions which is a complicated process and is further explored in Rejected Alternatives.

Proposed Changes

Ignore unknown record types

Adding new record types or new fields is not backward compatible because older coordinators do not expect any change in the schema.

  • New record types: Future changes to both __consumer_offsets and __transaction_state topics may introduce new record types that are unknown to the existing coordinators. Today, the transaction coordinator fails as it tries to deserialize unknown record keys into TransactionLogKey whereas the group coordinator throws an IllegalStateException when the record key is unknown.
  • New fields to existing record types: We may introduce new fields to existing Value records in both topics (TransactionLogValue, GroupMetadataValue, and OffsetCommitValue) and bump their versions. The existing implementation of both coordinators throw an exception when the record type is known but the version is not supported.

The only way to downgrade is to reformat these records to older versions which is a complicated process and is further explored in Rejected Alternatives. KIP-848 introduced a new group coordinator that is able to conduct incremental, cooperative rebalances led by the server. KIP-848 significantly changes how the group coordinator works and therefore is the primary motivator for this KIP. This KIP proposes to lay a generic foundation for both coordinators so that future KIPs can illustrate simple downgrade paths.

Proposed Changes

Ignore unknown record types

This KIP proposes to ignore unknown record keys which allows the downgraded coordinator to proceed with loading the rest of the partition. As we cannot write tombstones for unknown keys, these records will be stored in the logs until the coordinators are upgraded. However, this KIP prioritizes the simplicity of ignoring them because we expect downgrades to be non-permanent. 

Bump non-flexible record types to flexible versions

We may introduce new fields to existing Value records in both topics (TransactionLogValue, GroupMetadataValue, and OffsetCommitValue) and bump their versions. The existing implementation of both coordinators throw an exception when the version is not supported which prevents the downgraded coordinators from loading partitions. In this KIP we propose to bump each of these records to a flexible version and backport this to earlier releases, mainly minor versions in 3.X. We will also apply this to 3.5 if the patch is merged to trunk before the code freeze date. Backporting this patch to earlier releases is acceptable because the change is small and is very low risk. One limitation is that we will be unable to downgrade to a version lower version than the earliest version we apply this tothan 3.X.

We will rely on tagged fields (introduced in KIP-482) which allows additions and deletions of new fields without needing to bump versions. Once a version is flexible, deserializing tagged fields is straightforward as they are automatically ignored. We do not touch Key record types because keys are considered fixed and optional fields in keys do not make much sense.

We propose to prevent version bumps for both keys and values. and to prevent non-tagged fields from being added to values. Modifying SchemaGenerator to enforce these rules may be possible require new fields added to existing Value records to always be tagged fields. We can modify SchemaGenerator to enforce this. Ideally, we should never bump the version for these records but this may be difficult to enforce. We propose to deserialize unknown versions to the highest supported version as a safeguard and add a comment to advise against bumping the version.

Public Interfaces

__consumer_offsets

GroupMetadataValue.json

Bump to flexible version

Code Block
linenumberstrue
// KIP-915 do not bump the version and only add/remove tagged fields
{
  "type": "data",
  "name": "GroupMetadataValue",
  "validVersions": "0-4",
  "flexibleVersions": "4+",
  "fields": [
    { "name": "protocolType", "versions": "0+", "type": "string"},
    { "name": "generation", "versions": "0+", "type": "int32" },
    { "name": "protocol", "versions": "0+", "type": "string", "nullableVersions": "0+" },
    { "name": "leader", "versions": "0+", "type": "string", "nullableVersions": "0+" },
    { "name": "currentStateTimestamp", "versions": "2+", "type": "int64", "default": -1, "ignorable": true},
    { "name": "members", "versions": "0+", "type": "[]MemberMetadata" }
  ],
  "commonStructs": [
    {
      "name": "MemberMetadata",
      "versions": "0-4",
      "fields": [
        { "name": "memberId", "versions": "0+", "type": "string" },
        { "name": "groupInstanceId", "versions": "3+", "type": "string", "default": "null", "nullableVersions": "3+", "ignorable": true},
        { "name": "clientId", "versions": "0+", "type": "string" },
        { "name": "clientHost", "versions": "0+", "type": "string" },
        { "name": "rebalanceTimeout", "versions": "1+", "type": "int32", "ignorable": true},
        { "name": "sessionTimeout", "versions": "0+", "type": "int32" },
        { "name": "subscription", "versions": "0+", "type": "bytes" },
        { "name": "assignment", "versions": "0+", "type": "bytes" }
      ]
    }
  ]
}

...

Code Block
linenumberstrue
// KIP-915 do not bump the version and only add/remove tagged fields
{
  "type": "data",  
  "name": "OffsetCommitValue",  
  "validVersions": "0-4",  
  "flexibleVersions": "4+",  
  "fields": [
    { "name": "offset", "type": "int64", "versions": "0+" },    
    { "name": "leaderEpoch", "type": "int32", "versions": "3+", "default": -1, "ignorable": true},    
    { "name": "metadata", "type": "string", "versions": "0+" },    { "name": "commitTimestamp", "type": "int64", "versions": "0+" },    
    { "name": "expireTimestamp", "type": "int64", "versions": "1", "default": -1, "ignorable": true}
  ]
}

__transaction_state

TransactionLogValue.json

Bump to flexible version

Code Block
linenumberstrue
// KIP-915 do not bump the version and only add/remove tagged fields
{
  "type": "data",
  "name": "TransactionLogValue",
  "validVersions": "1",
  "flexibleVersions": "1+",
  "fields": [
    { "name": "ProducerId", "type": "int64", "versions": "0",
      "about": "Producer id in use by the transactional id"},
    { "name": "ProducerEpoch", "type": "int16", "versions": "0",
      "about": "Epoch associated with the producer id"},
    { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0",
      "about": "Transaction timeout in milliseconds"},
    { "name": "TransactionStatus", "type": "int8", "versions": "0",
      "about": "TransactionState the transaction is in"},
    { "name": "TransactionPartitions", "type": "[]PartitionsSchema", "versions": "0", "nullableVersions": "0",
      "about": "Set of partitions involved in the transaction", "fields": [
      { "name": "Topic", "type": "string", "versions": "0"},
      { "name": "PartitionIds", "type": "[]int32", "versions": "0"}]},
    { "name": "TransactionLastUpdateTimestampMs", "type": "int64", "versions": "0",
      "about": "Time the transaction was last updated"},
    { "name": "TransactionStartTimestampMs", "type": "int64", "versions": "0",
      "about": "Time the transaction was started"}
  ]
}

...