Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

As mentioned in the previous section, we should enforce that new record types introduced in the future will always be deleted when a group is reverted to a generic group although it is not strictly necessary in terms of correctness. 

Public Interfaces

Reading new fields

...

To use this, we would need to have topicId modified as a tagged field as part of KIP-848 so that the downgraded coordinator can skip it. Also, we need to change the logic in GroupMetadataManager.readOffsetMessageValue() so that if the version is unknown, we deserialize it with the highest known version instead of throwing an IllegalStateException. This is possible because tagged fields are appended to the end when they are serialized so that when they are deserialized tagged fields that are unknown to the reader are safely ignored (verified behavior through unit test). The alternative approach is to ignore unknown versions which would work but we would lose the committed offsets after the coordinator is downgraded. Once the OffsetCommitValue record becomes flexible, we can introduce new tagged fields without a version bump as it is guaranteed by KIP-482.

OffsetCommitValue.json

This KIP introduces the tag, taggedVersions properties to the topicId field.

Restrictions and Guidelines

We want to apply the use of tagged fields to all __consumer_offsets record types and not just the OffsetCommitValue record. We require all new fields to be tagged fields for all __consumer_offsets record types. In the case the user downgrades from V2 to V1 where both versions use the new protocol, the record types still need to be readable by the V1 coordinator. All Value record types introduced in KIP-848 (i.e., ConsumerGroupMemberMetadataValue) are already accepted as flexible versions so they will be viable for downgrade if new fields are added in future versions.

There are also inherent constraints with using tagged fields. From KIP-482:

  1. a tag number can never be reused
  2. the format of a taggedField can never be modified.

Once OffsetCommitValue is bumped to a flexible version, we can safely assume that the downgraded coordinator can downgrade any record type that are unknown to the latest known version and ignore unknown tagged fields. However, once we introduce tagged fields to OffsetCommitValue we can never downgrade the cluster to a version older than the first flexible version (4). This is okay if we use feature flag versioning and introduce a flexible OffsetCommitValue record for the first group coordinator version. On the other hand, if we stick to IBP/MV then there isn't a clear solution. 

OffsetCommitValue.json

This KIP introduces the tag, taggedVersions properties to the topicId field.

Code Block
linenumberstrue
{
  "type": "data",  
  "name": "OffsetCommitValue",  
  "validVersions": "0-4",  
  "flexibleVersions": "4+",  
  "fields": [
Code Block
themeConfluence
linenumberstrue
{
  "type": "data",  
  "name": "OffsetCommitValue",  
  "validVersions": "0-4",  
  "flexibleVersions": "4+",  
  "fields": [
    { "name": "offset", "type": "int64", "versions": "0+" },    
    { "name": "leaderEpoch", "type": "int32", "versions": "3+", "default": -1, "ignorable": true},    
    { "name": "metadata", "type": "string", "versions": "0+" },    { "name": "commitTimestampoffset", "type": "int64", "versions": "0+" },    
    { "name": "expireTimestampleaderEpoch", "type": "int64int32", "versions": "13+", "default": -1, "ignorable": true}
,    //
 KIP-848 Adds TopicId field. KIP-915 adds tag and taggedVersions
{ "name": "metadata", "type": "string", "versions": "0+" },    { "name": "topicIdcommitTimestamp", "type": "uuidint64", "tagversions": "0+" },    
    { "taggedVersionsname": "4+expireTimestamp", "versionstype": "4int64", "versions": "1", "default": -1, "ignorable": true }
     ]
}

Restrictions and Guidelines

Using tagged fields comes with some restrictions: new fields added to the OffsetCommitValue record will always need to be a tagged field. This doesn't apply to other record types because they are assumed to be deleted. New record types will always be deleted during the group conversion as mentioned above. We can remove tagged fields, but will need a version bump.

From KIP-482:

  1. a tag number can never be reused
  2. the format of a taggedField can never be modified.

...

// KIP-848 Adds TopicId field. KIP-915 adds tag and taggedVersions
    { "name": "topicId", "type": "uuid", "tag": 0, "taggedVersions": "4+", "versions": "4", "ignorable": true }
  ]
}

Compatibility, Deprecation, and Migration Plan

...

  1. start 3 consumers that use the new protocol and commit offsets on a 3 broker cluster
  2. Save group metadata then perform downgrade on all consumers. Verify via group count metrics
  3. Confirm group metadata including committed offsets is identical, then perform downgrade on all brokers.
  4. Confirm group metadata including committed offsets is identical.

Rejected Alternatives

Rejected Alternative: upgraded coordinator delete new record types

...

  1. including committed offsets is identical.

Rejected Alternatives

Rejected Alternative: upgraded coordinator deletes new and downgrades existing record types

Instead of the downgraded coordinator deleting the new record types when loading the partition, we can have the new coordinator delete the new record types before shutting down. This is possible with KIP-584 (feature flag) versioning approach: the operator downgrades the coordinator version which triggers coordinators to perform deletions for the new record types. We can ensure that all partitions will be compacted even if a broker is down since the partitions will have migrated to an online broker. Once coordinators append tombstones for the new record types they can explicitly trigger compaction. This introduces additional time spent cleaning up during downgrades. More importantly, coordinators need to downgrade OffsetCommitValue records so that the downgraded coordinator can load committed offsets. This means coordinators need to rewrite all offset commits with the old format, including transactional offset commits. 

Rewriting transactional offset commits complicates the downgrade path:

  • If a transactional offset commit is in progress, we need to abort it before reformatting but we don't have a mechanism in place to trigger a server side abort. Furthermore, we will need to add logic so that the coordinator is notified when a transaction is aborted to proceed with the rewrite.
  • Producers perspective: we would either have to make the rewrite completely invisible to the producer or have the producer retry after aborting it from the server side. Both paths are complex and require additional investigation.
  • Definition of a rewrite: should we consider translating the transaction start time / deadline when rewriting?

The benefit of this approach is that future record types are deleted. The proposed approach to ignore new records only works because the coordinator deletes new record types when a group is converted from new to old. However, we may introduce new record types that are not deleted during this conversion (this KIP proposes to enforce that new record types are always deleted during group conversion). Another benefit is that there are no strict requirements for the OffsetCommitValue record. We don't have to only add taggedFields (which this KIP requires) since these records will be rewritten anyways. Having the upgraded coordinator explicitly rewrite new record types and downgrade is future proof.