You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Status

Current state: Under Discussion

Discussion thread:

JIRA:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Traditionally, committed offsets in Kafka were expired based on a configurable retention time. In KIP-211, retention semantics were changed to take into account group status. Basically as long as a group is still active, no offsets would be expired. This addressed the problem of losing committed offsets for low-volume partitions which rarely have new data, but it introduced two new problems:

  1. The expiration timestamp of an offset was removed from the OffsetCommit protocol. Some users were relying on the ability to set this in order to force expiration of committed offsets which were no longer in use.
  2. When a consumer subscription changes, we are still left with the committed offsets of the previous subscription. These will never be cleaned up as long as the group remains active. We were aware of this problem in KIP-211, but the solution was not implemented because the coordinator is presently agnostic to join group metadata and we were unclear about the compatibility implications of changing that.

The impact of these problems is that it is difficult to rely on lag monitoring using committed offsets. If a consumer is not subscribed to a topic, the lag will just grow. Also, it prevents the broker from cleaning up unused state.

Public Interfaces

We will add a new API to delete committed offsets.

The request schema is documented below:

{
  "apiKey": X,
  "type": "request",
  "name": "OffsetDeleteRequest",
  "validVersions": "0",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about": "The unique group identifier." },
     { "name": "Topics", "type": "[]OffsetDeleteRequestTopic", "versions": "0+",
      "about": "The topics to delete offsets for",  "fields": [
      { "name": "Name", "type": "string", "versions": "0+",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]OffsetDeleteRequestPartition", "versions": "0+",
        "about": "Each partition to delete offsets for.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
      ]}
    ]}
  ]

And here is the response schema:

{
  "apiKey": X,
  "type": "response",
  "name": "OffsetDeleteResponse",
  "validVersions": "0",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Topics", "type": "[]OffsetDeleteResponseTopic", "versions": "0+",
      "about": "The responses for each topic.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]OffsetDeleteResponsePartition", "versions": "0+",
        "about": "The responses for each partition in the topic.",  "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." }
      ]}
    ]}
  ]
}


This capability will be exposed through the admin client in the following API:

interface Admin {

  /** 
   * Delete committed offsets for a set of partitions in a consumer group. This will
   * succeed at the partition level only if the group is not actively subscribed
   * to the corresponding topic.
   */
  DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(
     String groupId, 
     Set<TopicPartition> partitions, 
     DeleteConsumerGroupOffsetsOptions options) 
}

@InterfaceStability.Evolving
public class DeleteConsumerGroupOffsetsResult {
  public KafkaFuture<Void> partitionResult(TopicPartition partition);
  public KafkaFuture<Void> all();
}

@InterfaceStability.Evolving
public class DeleteConsumerGroupOffsetsOptions extends AbstractOptions<DeleteConsumerGroupOffsetsOptions> {
}
    


Proposed Changes

To fix the problem described in the motivation, we need to make the group coordinator aware of consumer subscription semantics. The rebalance protocol was designed to be generic so that it could handle use cases beyond the consumer. For example, it is also used by Kafka Connect. Different group implementations are distinguished by a "ProtocolType" field in the JoinGroup request. For the consumer, the protocol type is "consumer." The group coordinator only allows clients of one protocol type to exist in a group.

The schema of group metadata is specific to the protocol type. For the consumer, topic subscriptions are specified in the JoinGroup request using the following schema:

GroupMetadata => Version SubscribedTopics UserData
  Version => Int16
  SubscribedTopics => [String]
  UserData => Bytes

The risk of letting the coordinator parse this schema is that it will be harder to make changes to it in the future. For example, if we want to add a new field, we will have to be careful to upgrade the broker before the clients which . To address this problem, we propose to let the coordinator parse this data as version 0 only. Effectively it will ignore the value of the version field and just look for the array of subscribed topics. Any trailing bytes will be ignored. Future protocol changes will have to preserve the topic list.

Expiration Semantics: With the ability to parse topic subscriptions, the coordinator will be able to maintain the set of topics that the group is currently interested in. Any committed offset for a partition which is not currently subscribed to is subject to expiration. As is the behavior today, when the group becomes empty, all offsets are subject to expiration. 

Deletion Semantics: Any offset which is eligible for expiration may be deleted even if the group is still active. Usually we do not allow committed offset changes while a group is active because we do not have a mechanism to notify the group of the change. However, offsets which are awaiting expiration do not have this problem because they are not being actively consumed. Hence the deletion can be immediate even when the group is active.

Compatibility, Deprecation, and Migration Plan

This is a backwards compatible change. The major impact is how it affects the future evolution of the consumer group protocol. This is documented above.

Rejected Alternatives

None yet

  • No labels