Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

There is obvious performance penalty compared with zero-copy mechanism for existing consumer fetching, however the value of unblocking new use cases motivates us to figure out more optimizations in the long run.

With key based filtering + IC offset support, user is already capable of doing standalone mode of partition data sharing. We shall discuss the consumer API changes to support this feature in the public interface section.

Rebalance support for concurrent assignment 

To determine the hash range before each consumer generation stabilizes, in the group assignment phase the leader will evaluate the situation and decide whether to trigger the sharing mechanism. For the very first version, a very intuitive criteria is comparing the relative size of topic partitions vs number of consumers. For example if number of consumers m > number of partitions n, we would do the assignment based off partitions instead of  they would be sharing part of partitions for consumption in a round robin fashion. For example, if 5 consumers subscribing to 1 topic with 3 partitions, the final assignment would be:

N = Long.MAX
M1: tp1[0, N/2 - 1],
M2: tp2[0, N/2 - 1],
M3: tp3,
M4: tp1[N/2, N]
M5: tp2[N/2, N]

The new assignment comes from the fact that partitions are playing a reverse mapping to consumers. So in partition perspective, our assignment looks like a round robin assignment based off partitions:

tp1: M1, M4
tp2: M2, M5
tp3: M3

In the near term, we could consider cache the filter results of one batch of data so that when other fetch requests fall into this range, we could serve in-memory result instead of doing a repetitive fetch. For example, if a fetch request coming in to ask for range [40-50], but only [40, 44] are satisfying its hash range. The result of [45, 49] will be saved as message keys,  available for next fetch request instead of evicting immediately. We also recommend a config to control how much memory we would like to spend on this feature, see max.bytes.key.range.hashing. Also user could opt to choose either load the message key or the entire message into the main memory depending on the understanding of business needs since the variation of message value could be huge. This option would be introduced as a topic level config. Either way, once a cached result is being sent out, it will be evicted immediately to leave room for the current or future key range filter results.

With key based filtering + IC offset support, user is already capable of doing standalone mode of partition data sharing. We shall discuss the consumer API changes to support this feature in the public interface section.

Rebalance support for concurrent assignment 

To determine the hash range before each consumer generation stabilizes, in the group assignment phase the leader will evaluate the situation and decide whether to trigger the sharing mechanism. For the very first version, a very intuitive criteria is comparing the relative size of topic partitions vs number of consumers. For example if number of consumers m > number of partitions n, we would do the assignment based off partitions instead of  they would be sharing part of partitions for consumption in a round robin fashion. For example, if 5 consumers subscribing to 1 topic with 3 partitions, the final assignment would be:

N = Long.MAX
M1: tp1[0, N/2 - 1],
M2: tp2[0, N/2 - 1],
M3: tp3,
M4: tp1[N/2, N]
M5: tp2[N/2, N]

The new assignment comes from the fact that partitions are playing a reverse mapping to consumers. So in partition perspective, our assignment looks like a round robin assignment based off partitions:

tp1: M1, M4
tp2: M2, M5
tp3: M3

The assigned ranges could then be used by the consumer to make their fetch more specific. This step unblocks the potential to The assigned ranges could then be used by the consumer to make their fetch more specific. This step unblocks the potential to allow a dynamic consumer group scaling beyond partition level capping.

...

New Configurations

Broker configs


accept.individual.commit

Determine whether the group coordinator allows individual commit.

Default: true.

max.bytes.key.range.results

The maximum memory allowed to hold partitioned records in-memory for next batch serving. Note that this is not the memory limit for handling one batch of data range check.

Default: 209715200 bytes.

individual.commit.log.segment.bytes

The segment size for the individual commit topic.

Default: 10485760 bytes.

Topic config


accept.range.fetch

Determine whether the topic is allowed to perform key range based fetch. The reason to set it false could be more like a user's call such that any subscribing application must obey the partition level ordering

accept.individual.commitDetermine whether the group coordinator allows individual commit

.

Default: true.

max
reserve.
bytes
message.
key
value.range.
hashingThe maximum memory allowed to hold partitioned records in-memory
fetch

Determine whether when caching last fetch request result, we should also keep the message value inside main memory for optimization.

Default:

 524288000 bytes.

false

individual.commit.log.segment.bytes

The segment size for the individual commit topic.

Default: 10485760 bytes.

Consumer configs

allow.individual.commit

Determine whether this consumer will participate in a shared consumption mode with other consumers.

Default: false.

...