...
There is obvious performance penalty compared with zero-copy mechanism for existing consumer fetching, however the value of unblocking new use cases motivates us to figure out more optimizations in the long run.
With key based filtering + IC offset support, user is already capable of doing standalone mode of partition data sharing. We shall discuss the consumer API changes to support this feature in the public interface section.
Rebalance support for concurrent assignment
To determine the hash range before each consumer generation stabilizes, in the group assignment phase the leader will evaluate the situation and decide whether to trigger the sharing mechanism. For the very first version, a very intuitive criteria is comparing the relative size of topic partitions vs number of consumers. For example if number of consumers m > number of partitions n, we would do the assignment based off partitions instead of they would be sharing part of partitions for consumption in a round robin fashion. For example, if 5 consumers subscribing to 1 topic with 3 partitions, the final assignment would be:
N = Long.MAX
M1: tp1[0, N/2 - 1],
M2: tp2[0, N/2 - 1],
M3: tp3,
M4: tp1[N/2, N]
M5: tp2[N/2, N]
The new assignment comes from the fact that partitions are playing a reverse mapping to consumers. So in partition perspective, our assignment looks like a round robin assignment based off partitions:
tp1: M1, M4
tp2: M2, M5
tp3: M3
In the near term, we could consider cache the filter results of one batch of data so that when other fetch requests fall into this range, we could serve in-memory result instead of doing a repetitive fetch. For example, if a fetch request coming in to ask for range [40-50], but only [40, 44] are satisfying its hash range. The result of [45, 49] will be saved as message keys, available for next fetch request instead of evicting immediately. We also recommend a config to control how much memory we would like to spend on this feature, see max.bytes.key.range.hashing. Also user could opt to choose either load the message key or the entire message into the main memory depending on the understanding of business needs since the variation of message value could be huge. This option would be introduced as a topic level config. Either way, once a cached result is being sent out, it will be evicted immediately to leave room for the current or future key range filter results.
With key based filtering + IC offset support, user is already capable of doing standalone mode of partition data sharing. We shall discuss the consumer API changes to support this feature in the public interface section.
Rebalance support for concurrent assignment
To determine the hash range before each consumer generation stabilizes, in the group assignment phase the leader will evaluate the situation and decide whether to trigger the sharing mechanism. For the very first version, a very intuitive criteria is comparing the relative size of topic partitions vs number of consumers. For example if number of consumers m > number of partitions n, we would do the assignment based off partitions instead of they would be sharing part of partitions for consumption in a round robin fashion. For example, if 5 consumers subscribing to 1 topic with 3 partitions, the final assignment would be:
N = Long.MAX
M1: tp1[0, N/2 - 1],
M2: tp2[0, N/2 - 1],
M3: tp3,
M4: tp1[N/2, N]
M5: tp2[N/2, N]
The new assignment comes from the fact that partitions are playing a reverse mapping to consumers. So in partition perspective, our assignment looks like a round robin assignment based off partitions:
tp1: M1, M4
tp2: M2, M5
tp3: M3
The assigned ranges could then be used by the consumer to make their fetch more specific. This step unblocks the potential to The assigned ranges could then be used by the consumer to make their fetch more specific. This step unblocks the potential to allow a dynamic consumer group scaling beyond partition level capping.
...
New Configurations
Broker configs
accept.individual.commit | Determine whether the group coordinator allows individual commit. Default: true. |
max.bytes.key.range.results | The maximum memory allowed to hold partitioned records in-memory for next batch serving. Note that this is not the memory limit for handling one batch of data range check. Default: 209715200 bytes. |
individual.commit.log.segment.bytes | The segment size for the individual commit topic. Default: 10485760 bytes. |
Topic config
accept.range.fetch | Determine whether the topic is allowed to perform key range based fetch. The reason to set it false could be more like a user's call such that any subscribing application must obey the partition level ordering |
. Default: true. |
reserve. |
message. |
value.range. |
fetch | Determine whether when caching last fetch request result, we should also keep the message value inside main memory for optimization. Default: |
false | |
individual.commit.log.segment.bytes | The segment size for the individual commit topic. Default: 10485760 bytes. |
Consumer configs
allow.individual.commit | Determine whether this consumer will participate in a shared consumption mode with other consumers. Default: false. |
...