Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

There is obvious performance penalty compared with zero-copy mechanism for existing consumer fetching, however the value of unblocking new use cases motivates us to figure out more optimizations in the long run. In the near term, we could consider cache the filter results of one batch of data so that when other fetch requests fall into this range, we could serve in-memory result instead of doing a repetitive fetch. For example, if a fetch request coming in to ask for range [40-50], but only [40, 44] are satisfying its hash range. The result of [45, 49] will be saved as message keys,  available for next fetch request instead of evicting immediately. We also recommend a config to control how much memory we would like to spend on this feature, which is called max.bytes.key.range.hashing. When the cache is sufficient with space and the Fetch request is hanging for new data, producer results could write directly into the cache and do the filtering in order to further save the disk read time.

Also user could opt to choose either load the message key or the entire message into the main memory depending on the understanding of business needs since the variation of message value could be huge. This option would be introduced as a topic level config. Either way, once a cached result is being sent out, it will be evicted immediately to leave room for the current or future key range filter results.

With key based filtering + IC offset support, user is already capable of doing standalone mode of partition data sharing. By leveraging cloud operation technology such as Kubernetes, user could generate a set of pod configurations that defines which consumer subscribing to which topic partition range, and do easy hot swap when certain nodes go down. We shall discuss the consumer API changes to support this feature in the public interface section.

Rebalance support for concurrent assignment 

Multiple restrictions

Thinking in even longer term, this work could be more generic for broker side filtering. Client could specify: 1. key ranges, 2. blacklist keys, 3. whitelist keys, 4. max/min record size, etc to further reduce the network cost with a penalty on throughput and latency.

Rebalance support for concurrent assignment 

Multiple restrictions

To satisfy different administration requirements to turn off this feature at any time, we are To satisfy different administration requirements to turn off this feature at any time, we are placing several configurations on:

...

Similarly for other assignment strategy such as range assignor, we always attempt to to do a `reverse-assign` when consumers out-number any topic's total partitions.

This means for 5 consumers subscribing to 2 topics with 2 and 3 partitions each, the final assignment will look like (in partition perspective):

t1p0: c1, c2, c3
t1p1: c4, c5
-----
t2p0: c1, c2
t2p1: c3, c4
t2p2: c5


This step unblocks the potential to allow This step unblocks the potential to allow a dynamic consumer group scaling beyond partition level capping.

...

To recognize whether a consumer is allowed to subscribe as key-share, we shall put a new boolean field in Subscription to indicate that:

Code Block
languagejava
Code Block
languagejava
titleSubscription.java
public Subscription {
  ...
  public boolean allowKeyShare();
}

To leverage the key based filtering in a group subscription mode, we will also add callbacks the ConsumerRebalanceListener to populate key range assignments if necessary:

...

Code Block
languagejava
titleListConsumerGroupOffsetsResult.java
public class ListConsumerGroupOffsetsResult {
	...
    final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future;
	...
}

public class OffsetAndMetadata {

    public long offset();
 
	public List<Tuple<Long, Long>> rangeOffsets(); // NEW

    public String metadata();

    public Optional<Integer> leaderEpoch();
}

New

...

Exceptions

We are going to define a series of exceptions which are thrown as 1. access control was hit, 2. inconsistency

Code Block
languagejava
titleErrors.java
INDIVIDUAL_COMMIT_NOT_ALLOWED(88, "The broker hosting coordinator doesn't allow individual commit", IndividualCommitNotAllowedException::new);
TOPIC_RANGE_FETCH_NOT_ALLOWED(89, "The topic is not allowed to perform range fetch with", TopicRangeFetchNotAllowedException::new);
CONSUMER_RANGE_FETCH_NOT_ACCEPTED(90, "The consumer is not allowed to do the range fetch", ConsumerRangeFetchNotAccepted::new);
INDIVIDUAL_COMMIT_TOO_OLD(91, "The individual commit failed due to attempting to commit some entry behind LSO");

New Configurations

Broker configs

accept.individual.commit

Determine whether the group coordinator allows individual commit.

Default: true

max.bytes.key.range.results

The maximum memory allowed to hold partitioned records in-memory for next batch serving. Note that this is not the memory limit for handling one batch of data range check.

Default: 209715200 bytes

individual.commit.log.segment.bytes

The segment size for the individual commit topic.

Default: 10485760 bytes

Topic config

accept.range.fetch

Determine whether the topic is allowed to perform key range based fetch. The reason to set it false could be more like a user's call such that any subscribing application must obey the partition level ordering.

Default: false

reserve.message.value.range.fetch

Determine whether when caching last fetch request result, we should also keep the message value inside main memory for optimization.

Default: false

Broker configs

accept.individual.commit

Determine whether the group coordinator allows individual commit.

Default: true

max.bytes.key.range.results

The maximum memory allowed to hold partitioned records in-memory for next batch serving. Note that this is not the memory limit for handling one batch of data range check.

Default: 209715200 bytes

individual.commit.log.segment.bytes

The segment size for the individual commit topic.

Default: 10485760 bytes

...

Consumer configs

Determine whether the topic is allowed to perform key range based fetch. The reason to set it false could be more like a user's call such that any subscribing application must obey the partition level ordering

allow.individual.commit

Determine whether this consumer will participate in a shared consumption mode with other consumers

accept.range.fetch

.

Default: true

max.reservenum.messageranges.value.range.fetch

Determine whether when caching last fetch request result, we should also keep the message value inside main memory for optimization.

Default: false

individual.commit.log.segment.bytes

The segment size for the individual commit topic.

Default: 10485760 bytes

Consumer configs

...

allow.individual.commit

...

Determine whether this consumer will participate in a shared consumption mode with other consumers.

Default: true

...

Commit the progress if accumulated offset ranges are beyond this number.

Default: 10,000

to.commit

Commit the progress if accumulated offset ranges are beyond this number.

Default: 10,000

Related Work

Pulsar has officially supported key share feature in 2.4, which suggests multiple consumers could share the same partition data.

The blockers for us to implement a similar feature are:

  1. Our consumer model is pull based, which incurs random read if we let consumers ask for specific keyed records. Sequential read is the key performance sugar for Kafka, as otherwise we could not bypass memory copy of the data.
  2. Our broker doesn’t know anything about data distribution, as all the metadata is encoded and looks opaque to them. In reality, we could not afford letting consumers fetch with raw keys.
  3. Consumer coordinator is at a centralized location, however we need to distribute keys in different partitions. For Pulsar their offset data is co-located with actual partitions. The burden for broadcasting the state change in Kafka would be pretty hard and very error-prone.

Compatibility, Deprecation, and Migration Plan

As transactional support and cooperative fetch need more design discussion, we will delay putting on the upgrade path now.

To upgrade an existing consumer use case to allow individual commit + key based filtering + rebalance support, the user needs to:

  1. Upgrade broker to latest, by default individual commit is turned on
  2. Allow the input topic to be shared by turning on the range fetch config
  3. Upgrade consumer client to allow range sharing
  4. Rolling bounce the group, and the consumer group should be able to share consumption of the topic

Failure Recovery and Test Plan

We discuss the failure scenario alongside with the test plan to better justify the value.

  1. Individual Commit failure. The failure could result from:
    1. Broker restriction. If this is the case, the client will immediately fail with restriction exception.
    2. IC log not found or corrupted. As the _consumed_offsets topic need to co-partition with IC log, we need to load the replica from ISR to the current coordinator, and force a leader election to nominate the new log as the leader. 
    3. Commit too old: the IC offset is smaller than the current LSO. In this case, reply with the current LSO and client will adjust its position to catch up with latest.
    4. In at least once, commit in duplicate is not a serious flag. Brokers should not response a separate exception but will do the warning log for user to debug later.
  2. Range Fetch failure. The failure could result from:
    1. Topic restriction. The client will immediately crash with restriction exception.
    2. Other common fetch request errors like topic not found, shall be handled in the same way
  3. Rebalance support failure. The failure could result from:
    1. Consumer restriction: Consumer will reject an assignment with range fetch if it is not allowed to.
    2. Make sure the no assignment overlap is detected in system tests.
    3. Other common rebalance failure

Related Work

Pulsar has officially supported key share feature in 2.4, which suggests multiple consumers could share the same partition data.

The blockers for us to implement a similar feature are:

  1. Our consumer model is pull based, which incurs random read if we let consumers ask for specific keyed records. Sequential read is the key performance sugar for Kafka, as otherwise we could not bypass memory copy of the data.
  2. Our broker doesn’t know anything about data distribution, as all the metadata is encoded and looks opaque to them. In reality, we could not afford letting consumers fetch with raw keys.
  3. Consumer coordinator is at a centralized location, however we need to distribute keys in different partitions. For Pulsar their offset data is co-located with actual partitions. The burden for broadcasting the state change in Kafka would be pretty hard and very error-prone.

Compatibility, Deprecation, and Migration Plan

  1. Individual Commit. Although it's not of real use with only individual commit support, Kafka users or developers could start evaluating its performance with some customized applications.
    1. broker has to be upgraded first, until then all the commit request with hash range configured shall fail.
    2. Upgrade the client version.
  2. Key based filtering. Note that if individual commit is already launched when we finished the setup for key based filtering, users could use assign mode to co-processing same partition already.
    1. upgrade broker first
    2. Upgrade the client 
  3. Rebalance support. The config `allow.individual.commit` controls whether a consumer will choose to participate in a key-sharing assignment. If not turning on, the leader consumer shall not consider making it share with others.

As transactional support and cooperative fetch need more design discussion, we will delay putting on the upgrade path now.

Test Plan

...

    1. .

Rejected Alternatives

There are a couple of alternatives to this proposal here.

...