Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Minor corrections

...

  • The consumers in a share group cooperatively consume records without partition assignmentwith partitions that may be assigned to multiple consumers

  • The number of consumers in a share group can exceed the number of partitions

  • Records are acknowledged on an individual basis, although the system is optimized to work in batches for improved efficiency

  • Delivery attempts to consumers in a share group are counted to enable automated handling of unprocessable records

...

When a consumer in a share-group fetches records, it receives available records from any of the topic-partitions that match its subscriptions. Records are acquired for delivery to this consumer with a time-limited acquisition lock. While a record is acquired, it is not available for another consumer. By default, the lock duration is 30s, but it can also be controlled using a consumer configurationthe group group.share.record.lock.duration.ms configuration parameter. The idea is that the lock is automatically released once the lock duration has elapsed, and then the record is available to be given to another consumer. The consumer which holds the lock can deal with it in the following ways:

...

There are some concepts being introduced to Kafka to support share groups.

A share-The group coordinator is the broker which is the group coordinator for a share groupnow responsible for coordination of share groups as well as consumer groups. The responsibility for being share-group coordinator for the cluster’s share groups is distributed among the brokers, exactly as for consumer groups. The For share -groups, the group coordinator has the following responsibilities:

  • It maintains the list of share-group members.

  • It manages the topic-partition assignments for the share-group members using a server-side partition assignor. An initial, trivial implementation would be to give each member the list of all topic-partitions which matches its subscriptions and then use the pull-based protocol to fetch records from all partitions. A more sophisticated implementation could use topic-partition load and lag metrics to distribute partitions among the consumers as a kind of autonomous, self-balancing partition assignment, steering more consumers to busier partitions, for example. Alternatively, a push-based fetching scheme could be used.

...

  • EMPTY - When a share group is created or the last member leaves the group, the share group is EMPTY.
  • STABLE - When a share group has active members, the share group is STABLE.
  • DEAD - When the share group remains EMPTY for a configured period, the group coordinator transitions it to DEAD to delete it.

Persistence and

...

fail-

...

over

For a share group, the group coordinator only persists a single record which essentially reserves the group's ID as a share group in the namespace of groups.

...

When the SPSO advances because of the LSO moving, the in-flight records past which the SPSO moves logically move into Archived state. The exception is that records which are already Acquired for delivery to consumers can be acknowledged successfullywith any AcknowledgeType, at which point they logically transition into Archived state too; there's no need to throw an exception for a consumer which has just processed a record which is about to become Archived.

Note that because the share groups are all consuming from the same log, the retention behavior for a topic applies to all of the share groups consuming from that topic.

...

Cooperative consumption is inherently record-based, but the expectation is that batching is used to maximise performance. For example, in the case where all records in a batch are processed successfully:

  • When a consumer fetches records, the share-partition leader prefers to return complete record batches.

  • In the usual and optimal case, all of the records in a batch will be in Available state and can all be moved to Acquired state with the same acquisition lock time-out.

  • When the consumer has processed the fetched records, it can acknowledge delivery of all of the records as a single batch, transitioning them all into Acknowledged state.

...

ConfigurationDescriptionValues
group.share.enableWhether to enable share groups on the broker.Default false  while the feature is being developed. Will become true  in a future release.
group.share.delivery.count.limitThe maximum number of delivery attempts for a record delivered to a share group.Default 5, minimum 2, maximum 10
group.share.record.lock.duration.msShare-group record acquisition lock duration in milliseconds.Default 30000 (30 seconds), minimum 1000 (1 second), maximum 60000 (60 seconds)
group.share.record.lock.duration.max.msShare-group record acquisition lock maximum duration in milliseconds.Default 60000 (60 seconds), minimum 1000 (1 second), maximum 3600000 (1 hour)
group.share.record.lock.partition.limitShare-group record lock limit per share-partition.Default 200, minimum 100, maximum 10000
group.share.session.timeout.ms 

The timeout to detect client failures when using the group protocol.

Default 45000 (45 seconds)
group.share.min.session.timeout.ms 

The minimum session timeout.

Default 45000 (45 seconds)
group.share.max.session.timeout.ms 

The maximum session timeout.

Default 60000 (60 seconds)
group.share.heartbeat.interval.ms 

The heartbeat interval given to the members.

Default 5000 (5 seconds)
group.share.min.heartbeat.interval.ms 

The minimum heartbeat interval.

Default 5000 (5 seconds)
group.share.max.heartbeat.interval.ms 

The maximum heartbeat interval.

Default 15000 (15 seconds)
group.share.max.size 

The maximum number of consumers that a single share group can accommodate.

Default 200
group.share.assignors 

The server-side assignors as a list of full class names. In the initial delivery, only the first one in the list is used.

A list of class names. Default "org.apache.server.group.share.SimpleAssignor"
max.share.session.cache.slots 

The maximum number of share sessions that the broker will maintain.

Default 1000

Group configuration

The following dynamic group configuration properties are added. These are properties for which it would be problematic to have consumers in the same share group using different behavior if the properties were specified in the consumer clients themselves.

ConfigurationDescriptionValues
group.share.isolation.level 

Controls how to read records written transactionally. If set to "read_committed", the share group will only deliver transactional records which have been committed. If set to "read_uncommitted", the share group will return all messages, even transactional messages which have been aborted. Non-transactional records will be returned unconditionally in either mode.

Valid values "read_committed"  and "read_uncommitted" (default)

group.share.auto.offset.reset 

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server:

  • "earliest" : automatically reset the offset to the earliest offset

  • "latest" : automatically reset the offset to the latest offset

Valid values "latest"  (default) and "earliest" 

Consumer configuration

The following new configuration properties are added for the consumer.

ConfigurationDescriptionValues
group.share.record.lock.duration.ms 

Record acquisition lock duration in milliseconds.

null, which uses the cluster configuration group.share.record.lock.duration.ms, minimum 1000, maximum limited by the cluster configuration group.share.record.lock.duration.max.ms

Consumer configuration

The existing consumer configurations apply for share groups with the following exceptions:

...

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareAcknowledgeRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "SessionId", "type": "int32", "versions": "0+",
      "about": "The share session ID." },
    { "name": "SessionEpoch", "type": "int32", "versions": "0+",
      "about": "The share session epoch, which is used for ordering requests in a session." },
    { "name": "Topics", "type": "[]AcknowledgeTopic", "versions": "0+",
      "about": "The topics containing records to acknowledge.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]AcknowledgePartition", "versions": "0+",
        "about": "The partitions containing records to acknowledge.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
          "about": "Record batches to acknowledge.", "fields": [
          { "name": "StartOffset", "type": "int64", "versions": "0+",
            "about": "Start offset of batch of records to acknowledge."},
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "Last offset (inclusive) of batch of records to acknowledge."},
          { "name": "GapOffsets", "type": "[]int64", "versions": "0+",
            "about": "Array of offsets in this range which do not correspond to records."},
          { "name": "AcknowledgeType", "type": "stringint8", "versions": "0+", "default": "accept0",
            "about": "The type of acknowledgement, such as accept or release- 0:Accept,1:Release,2:Reject."}
        ]}
      ]}
    ]}
  ]
}

Response schema

...

The following new broker metrics should be added:

Metric Name

Type

Group

Tags

Description

JMX Bean

group-count

Gauge

group-coordinator-metrics

protocol: share

The total number of share groups managed by group coordinator.

kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share 

rebalance (rebalance-rate and rebalance-count)

Meter

group-coordinator-metrics

protocol: share

The total number of share group rebalances count and rate.

kafka.server:type=group-coordinator-metrics,name=rebalance-rate,protocol=share 


kafka.server:type=group-coordinator-metrics,name=rebalance-count,protocol=share 

num-partitions

Gauge

group-coordinator-metrics

protocol: share

The number of share partitions managed by group coordinator. 

kafka.server:type=group-coordinator-metrics,name=num-partitions,protocol=share 

group-countGaugegroup-coordinator-metrics

protocol: share

state: {empty|stable|dead} 

The number of share groups in respective state.kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share,state={empty|stable|dead} 

offset-commit (offset-commit-rate and offset-commit-count)

Meter

group-coordinator-metrics

protocol: share

The total number of committed offsets for share groups.

kafka.server:type=group-coordinator-metrics,name=offset-commit-rate,protocol=share 


kafka.server:type=group-coordinator-metrics,name=offset-commit-count,protocol=share 

record-acknowledgement (record-acknowledgement-rate and record-acknowledgement-count)

Meter

group-coordinator-metrics

protocol: share

ack-type:{accept,release,reject} 


The number of records acknowledged per acknowledgement type.

kafka.server:type=group-coordinator-metrics,name=record-acknowledgement-rate,protocol=share,ack-type={accept,release,reject} 


kafka.server:type=group-coordinator-metrics,name=record-acknowledgement-count,protocol=share,ack-type={accept,release,reject} 

Future Work

There are some obvious extensions to this idea which are not included in this KIP in order to keep the scope manageable.

...