You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

Status

Current state: Under Discussion

Discussion thread: here

JIRA: TBD

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

KIP-848 introduced a new group coordinator that is able to conduct incremental, cooperative rebalances led by the server. This KIP completes the backward compatible downgrade path for the new coordinator that was not covered in KIP-848.

KIP-848 highlights the upgrade process and how consumers can upgrade to the new protocol. To summarize, when the first consumer using the new protocol joins the group, the group coordinator persists the group members' metadata to the new protocol format. As other consumers upgrade, rejoin, and heartbeat, they will learn their assignments and the coordinator will attempt to incrementally drive the group to a Stable state. Conversely, when the last consumer using the new protocol leaves the group, the group coordinator will transition the group to the old protocol format. We will rely on this behavior to cleanup new record types during the downgrade process.

When do we expect to downgrade the group coordinator?

  • regression in the new group coordinator

    • regression in the rewrite of the old protocol

    • bug/performance issue in the new protocol, including future versions

  • bug in the new consumer

    • if there is no known issue with the new coordinator, we can only downgrade consumers in this case

The rest of the document discusses each step the operator takes to downgrade from a KIP-848 enabled coordinator and the rationalization behind it. Discussion on whether to use IBP/MV or a feature flag for the group coordinator is out of scope and will be proposed in a future KIP.

Proposed Changes

Downgrade Process

Consumers are downgraded first

Consumers should be downgraded first for mainly two reasons. First, consumers using the new protocol may be migrated to a downgraded group coordinator when a broker goes down. The coordinator won’t be able to understand the Heartbeat API which will cause consume downtime. This complicates the downgrade scenario because we need either a) backward compatible consumers or b) forward compatible group coordinators. Note that a) may be provided with KIP-848 but even with backward compatible consumers, consumers may flip-flop between using old vs. new protocol while partitions/groups are migrated during broker downgrades which is not favorable.

Second and more importantly, downgrading consumers first is useful because the group coordinator appends tombstones for new protocol records when a group transitions to the old protocol (when the last new protocol consumer leaves the group). Similarly it will append old protocol records to convert the group. As the group fully reverts to the old protocol the log cleaner will clean new protocol records in the background so we can expect all new record types to be eventually removed from the log assuming there are no bugs in this process. This makes it easier to reason about how a downgraded coordinator should handle these new record types.

The first downgraded consumer will startup and send join group requests to the coordinator with an empty member id which the coordinator will reject and request to rejoin (for JoinGroupRequest version < 4, this will trigger a rebalance). The coordinator is still using the new protocol, so it will compute the assignment and revoke/add partitions as necessary. When the last upgraded consumer leaves to downgrade, the coordinator will transition the group to the old protocol and append tombstones as mentioned above. This will trigger a rebalance on the existing downgraded consumers in the group.

Consumer Configs

There are 3 consumer configs added in KIP-848: group.protocol, group.remote.assignor, and group.local.assignors. Although it is not strictly necessary, the operator should remove the consumer configs when downgrading consumers.

Confirm whether all consumers are downgraded

The operator needs to verify that all consumers have been downgraded. This can be achieved with the new kafka-consumer-groups command’s --type argument introduced in KIP-848 that filters groups based on their type (either generic, consumer, or both: generic,consumer). Alternatively, KIP-848 provides a metric that counts the number of groups by type which the operator can use to confirm all groups are generic.

Broker Downgrade

Downgrading consumers may be sufficient if the operator can identify that there is a bug in the heartbeat handling logic on the consumer and not in the server. However, the new group coordinator may have bugs in the rewrite of the old protocol and/or there may be bugs on the server side as well. We can proceed to downgrading brokers once all consumers are downgraded.

Broker Configs

There are new static broker configs added in KIP-848 which will be unreadable by the downgraded coordinator. The operator should remove these static configs before restarting the brokers but it is not strictly necessary.

There are two additional dynamic broker configs introduced in KIP-848: group.consumer.session.timeout.ms and group.consumer.heartbeat.interval.ms. They will be stored in the metadata log as they are dynamic. These configs will be part of a new resource type GROUP which the existing quorum controller will throw an exception since the ConfigResource type is unknown. Therefore, we either need the controller ignore unknown resource types or have them cleaned up as part of the downgrade. For removing the configs, there are two options: 1) The controller or the coordinator cleans the dynamic configs. This is difficult to implement as dynamic configs do not have versions and likely will need a generic solution for all dynamic configs. And 2) have the operator remove these dynamic configs before downgrading. This introduces an additional manual step, but it is far less complex than the former option. This KIP proposes to take option 2) and make changes to the controller to ignore unknown resource types.

Broker shutdown

When a broker goes down, consumers will remain unaffected as they have already been downgraded and are using the old protocol. Their ownership moves to a different coordinator and the behavior is identical to what we do today, regardless of whether the migrated coordinator is downgraded or not.

Broker startup

Broker starts up with downgraded version. Preferred leadership will re-elect the downgraded broker as the coordinator for the group. The same logic applies from the broker shutdown. Nothing changes because coordinators were already using the old protocol. The main difference is the state of the log when the broker restarts: the downgraded broker needs to handle the new record types.

Identifying New Record Types

The __consumer_offsets topic is compacted and will be the destination for new record types in the new protocol. As such, we need the old coordinator to be able to read these new types and ignore/delete them. The old protocol uses record type version 0 through 2 included for keys.

KIP-848 highlights 6 new record types and one additional field to an existing record type (OffsetCommitValue) for the __consumer_offsets topic:

Group Metadata
  • ConsumerGroupMetadata

    • stores group id and epoch of the consumer group

  • ConsumerGroupMemberMetadata

    • holds member’s metadata such as group epoch, rack id, client id, subscribed topics, assignors, etc

  • ConsumerGroupPartitionMetadata

    • holds all the topics and their corresponding partition count for the given consumer group

Target Assignment
  • ConsumerGroupTargetAssignmentMetadata

    • group id and assignment epoch

  • ConsumerGroupTargetAssignmentMember

    • the targeted topic partitions that the coordinator will try to assign to a group member

Current Member Assignment
  • ConsumerGroupCurrentMemberAssignment

    • currently assigned topic partitions for a group member

Offsets (not new; extended)
  • OffsetCommitValue

    • introduces new topicId field

Reading new record types

This KIP proposes to make changes in the existing protocol so that the coordinator ignores record types that are unknown to the coordinator and tie that to the old protocol’s IBP/feature version. The existing implementation throws an IllegalStateException when the record key is unknown. As we expect the log cleaner to eventually clean up new record types in the log, we don’t have to manually add tombstones. Note that this is only true if consumers are downgraded first. We can add WARN level logs to indicate that the coordinator read an unknown record type. We can automate the cleanup by writing tombstones when the coordinator reads unrecognizable records. This may add duplicate work if tombstones were already added but not yet pruned by the log cleaner. Although we can force the log cleaner to run before downgrading this is not strictly necessary and will slow down the downgrade. This also acts as a safeguard for downgrades where consumers were not fully downgraded first.

Note that this approach is only applicable to the __consumer_offsets topic because the coordinator appends tombstones when members leave and groups become empty. This approach proposes a way to handle new record types that will be introduced to the __consumer_offsets topic in the future and not just in KIP-848. However, server logs may flood with WARN level logs if we don't proactively remove them so we propose to have the new group coordinator append tombstones for all newly introduced record types when a group is converted from consumer to generic as we do so in KIP-848.

Reading new fields in existing record types is discussed below in the Public Interfaces section.

Deleting new record types after group conversion

As mentioned in the previous section, we should enforce that new record types introduced in the future will always be deleted when a group is reverted to a generic group although it is not strictly necessary in terms of correctness. 

Public Interfaces

Reading new fields

The OffsetCommitValue record type is an existing one but KIP-848 introduces a new topicId field. The current implementation throws an IllegalStateException when the offset version is unknown. Ideally, we want the downgraded coordinator to be able to load the offset commits so that the commits are not lost which the taggedFields feature introduced in KIP-482 can help us. From KIP-482:

Tagged fields are always optional.  When they are not present in a message, they do not take up any space.

A new tagged field can be added to an existing protocol version without bumping the protocol version.  If the receiver does not expect a particular tagged field, it will simply skip over the field without deserializing it.

To use this, we would need to have topicId modified as a tagged field as part of KIP-848 so that the downgraded coordinator can skip it. Also, we need to change the logic in GroupMetadataManager.readOffsetMessageValue() so that if the version is unknown, we deserialize it with the highest known version instead of throwing an IllegalStateException. This is possible because tagged fields are appended to the end when they are serialized so that when they are deserialized tagged fields that are unknown to the reader are safely ignored (verified behavior through unit test). The alternative approach is to ignore unknown versions which would work but we would lose the committed offsets after the coordinator is downgraded. Once the OffsetCommitValue record becomes flexible, we can introduce new tagged fields without a version bump as it is guaranteed by KIP-482.

Restrictions and Guidelines

We want to apply the use of tagged fields to all __consumer_offsets record types and not just the OffsetCommitValue record. We require all new fields to be tagged fields for all __consumer_offsets record types. In the case the user downgrades from V2 to V1 where both versions use the new protocol, the record types still need to be readable by the V1 coordinator. All Value record types introduced in KIP-848 (i.e., ConsumerGroupMemberMetadataValue) are already accepted as flexible versions so they will be viable for downgrade if new fields are added in future versions.

There are also inherent constraints with using tagged fields. From KIP-482:

  1. a tag number can never be reused
  2. the format of a taggedField can never be modified.

Once OffsetCommitValue is bumped to a flexible version, we can safely assume that the downgraded coordinator can downgrade any record type that are unknown to the latest known version and ignore unknown tagged fields. However, once we introduce tagged fields to OffsetCommitValue we can never downgrade the cluster to a version older than the first flexible version (4). Unfortunately there is no a clear solution at the moment. 

OffsetCommitValue.json

This KIP introduces the tag, taggedVersions properties to the topicId field.

{
  "type": "data",  
  "name": "OffsetCommitValue",  
  "validVersions": "0-4",  
  "flexibleVersions": "4+",  
  "fields": [
    { "name": "offset", "type": "int64", "versions": "0+" },    
    { "name": "leaderEpoch", "type": "int32", "versions": "3+", "default": -1, "ignorable": true},    
    { "name": "metadata", "type": "string", "versions": "0+" },    { "name": "commitTimestamp", "type": "int64", "versions": "0+" },    
    { "name": "expireTimestamp", "type": "int64", "versions": "1", "default": -1, "ignorable": true}
    // KIP-848 Adds TopicId field. KIP-915 adds tag and taggedVersions
    { "name": "topicId", "type": "uuid", "tag": 0, "taggedVersions": "4+", "versions": "4", "ignorable": true }
  ]
}

Compatibility, Deprecation, and Migration Plan

We need to have a couple things set in place for the current coordinator so that when the downgraded coordinator loads partitions there are no compatibility issues:

  1. ignore unknown record types: this needs to be added to the current coordinator. Otherwise, the downgraded coordinator will fail to load partitions.
  2. ignore unknown record fields (tagged fields): this also needs to be added. The existing coordinator is unable to read versions that are greater than the maximum supported version (throws IllegalStateException). Also, the current coordinator needs the version bump to 4 so that OffsetCommitValue is a flexible version. Otherwise, it will be unable to read future taggedFields.
  3. ignore unknown ConfigResource types for dynamic configs. Otherwise, the downgrade will fail as the controller will throw an exception.

Test Plan

Once KIP-848 is implemented, we should add two system tests: one for the upgrade process and one for the downgrade process. Below describes the downgrade test:

  1. start 3 consumers that use the new protocol and commit offsets on a 3 broker cluster
  2. Save group metadata then perform downgrade on all consumers. Verify via group count metrics
  3. Confirm group metadata including committed offsets is identical, then perform downgrade on all brokers.
  4. Confirm group metadata including committed offsets is identical.

Rejected Alternatives

Rejected Alternative: upgraded coordinator deletes new and downgrades existing record types

Instead of the downgraded coordinator deleting the new record types when loading the partition, we can have the new coordinator delete the new record types before shutting down. This is possible with KIP-584 (feature flag) versioning approach: the operator downgrades the coordinator version which triggers coordinators to perform deletions for the new record types. We can ensure that all partitions will be compacted even if a broker is down since the partitions will have migrated to an online broker. Once coordinators append tombstones for the new record types they can explicitly trigger compaction. This introduces additional time spent cleaning up during downgrades. More importantly, coordinators need to downgrade OffsetCommitValue records so that the downgraded coordinator can load committed offsets. This means coordinators need to rewrite all offset commits with the old format, including transactional offset commits. 

Rewriting transactional offset commits complicates the downgrade path:

  • If a transactional offset commit is in progress, we need to abort it before reformatting but we don't have a mechanism in place to trigger a server side abort. Furthermore, we will need to add logic so that the coordinator is notified when a transaction is aborted to proceed with the rewrite.
  • Producers perspective: we would either have to make the rewrite completely invisible to the producer or have the producer retry after aborting it from the server side. Both paths are complex and require additional investigation.
  • Definition of a rewrite: should we consider translating the transaction start time / deadline when rewriting?

The benefit of this approach is that future record types are deleted. The proposed approach to ignore new records only works because the coordinator deletes new record types when a group is converted from new to old. However, we may introduce new record types that are not deleted during this conversion (this KIP proposes to enforce that new record types are always deleted during group conversion). Another benefit is that there are no strict requirements for the OffsetCommitValue record. We don't have to only add taggedFields (which this KIP requires) since these records will be rewritten anyways. Having the upgraded coordinator explicitly rewrite new record types and downgrade is future proof and there are no version downgrade barriers like we do for the proposed design.

  • No labels