Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

To determine the hash range before each for every consumer generation stabilizes, in the group assignment phase the leader will evaluate the situation and decide whether to trigger the sharing mechanism. For the very first version, a very intuitive criteria is comparing the relative size of topic partitions vs number of consumers that are opting into the key sharing mode. For example if number of consumers m > number of partitions n, we would do the assignment based off partitions instead of consumers. If 5 consumers subscribing to 1 topic with 3 partitions, the final assignment would be:

...

The new assignment comes from the fact that partitions are playing a reverse mapping to consumers. So in partition perspective, our assignment looks like a round robin assignment based off partitions:

tp1: M1, M4
tp2: M2, M5
tp3: M3: M1, M4
tp2: M2, M5
tp3: M3

The assigned ranges could then be used by the consumer to make their fetch more fruitful. We don't plan to introduce a separate assignor strategy, but instead we would like to rollout this strategy as a plugin to the existing assignment assignors. As of today the ConsumerPartitionAssignor interface looks like below:

Code Block
languagejava
titleConsumerPartitionAssignor.java
public ConsumerPartitionAssignor {
  public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                    Map<String, Subscription> subscriptions)
}


Take round robin assignor for an example, 

Restriction Enforcement

To satisfy different administration requirements, we are placing several configurations on:

  1. Broker level, to determine whether to allow IC request,
  2. Topic level, to determine whether this topic is allowed to be cooperatively processed 
  3. Consumer level, to determine if this consumer opts to share processing with someone else


Merge restriction with above algorithm, 

The assigned ranges could then be used by the consumer to make their fetch more fruitful. This step unblocks the potential to allow a dynamic consumer group scaling beyond partition level capping.

...

accept.individual.commit

Determine whether the group coordinator allows individual commit.

Default: true.

max.bytes.key.range.results

The maximum memory allowed to hold partitioned records in-memory for next batch serving. Note that this is not the memory limit for handling one batch of data range check.

Default: 209715200 bytes.

individual.commit.log.segment.bytes

The segment size for the individual commit topic.

Default: 10485760 bytes.

Topic config

accept.range.fetch

Determine whether the topic is allowed to perform key range based fetch. The reason to set it false could be more like a user's call such that any subscribing application must obey the partition level ordering.

Default: true.

reserve.message.value.range.fetch

Determine whether when caching last fetch request result, we should also keep the message value inside main memory for optimization.

Default: false

individual.commit.log.segment.bytes

The segment size for the individual commit topic.

Default: 10485760 bytes.

Consumer configs

allow.individual.commit

Determine whether this consumer will participate in a shared consumption mode with other consumers.

Default: true.

max.num.ranges.to.commit

Commit the progress if accumulated offset ranges are beyond this number.

Default: 10,000

...