Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

m

Table of Contents

Status

Current stateDraft

...

With the key based hashing, brokers will load topic partitions data into the main memory and do the partitioning work based on the key hashed range. When a FetchRequest comes with specific key range, broker will only reply the ranges that meets the meet the request need. This workflow could be implemented as a generic server side filtering, and KIP-283 is already a good example where we also attempt to load data into memory for down-conversion.

There is obvious performance penalty compared with zero-copy mechanism for existing consumer fetching, however the value of unblocking new use cases motivates us to figure out more optimizations in the long run. In the near term, we could consider cache the filter results of one batch of data so that when other fetch requests fall into this range, we could serve in-memory result instead of doing a repetitive fetch. For example, if a fetch request coming in to ask for range [40-50], but only [40, 44] are satisfying its hash range. The result of [45, 49] will be saved as message keys,  available for next fetch request instead of evicting immediately. We also recommend a config to control how much memory we would like to spend on this feature, see which is called max.bytes.key.range.hashing.

Also user could opt to choose either load the message key or the entire message into the main memory depending on the understanding of business needs since the variation of message value could be huge. This option would be introduced as a topic level config. Either way, once a cached result is being sent out, it will be evicted immediately to leave room for the current or future key range filter results.

With key based filtering + IC offset support, user is already capable of doing standalone mode of partition data sharing. By leveraging cloud operation technology such as Kubernetes, user could generate a set of pod configurations that defines which consumer subscribing to which topic partition range, and do easy hot swap when certain nodes go down. We shall discuss the consumer API changes to support this feature in the public interface section.

...

Code Block
OffsetFetchResponse => ThrottleTimeMs Topics ErrorCode
  ThrottleTimeMs => int32
  Topics         => List<OffsetFetchResponseTopic>
  ErrorCode      => int16

OffsetFetchResponseTopic => Name Partitions
  Name           => int32
  Partitions     => List<OffsetFetchResponsePartition>
  
OffsetFetchResponsePartition => PartitionIndex CommittedOffset CommittedLeaderEpoch Metadata ErrorCode
  PartitionIndex => int32
  CommittedOffset => int64
  CommittedLeaderEpoch => int32
  Metadata => string
  ErrorCode => int16
  CommittedRangeOffsets => List<Tuple<int64, int64>> // NEW

For better visibility, we shall include the range offsets in the ListConsumerGroupOffsetsResultTo leverage the key based filtering in a standalone mode, we also define a new consumer API in standalone mode:

Code Block
languagejava
titleListConsumerGroupOffsetsResultConsumer.java
public classConsumer ListConsumerGroupOffsetsResult {
	  ...

 /**
  * finalManually KafkaFuture<Map<TopicPartition,assign OffsetAndMetadata>> future;
	...
}

public class OffsetAndMetadata {

    public long offset();
 
	public List<Tuple<Long, Long>> rangeOffsets(); // NEW

    public String metadata();

    public Optional<Integer> leaderEpoch();
}
a list of partitions with specific key ranges to this consumer. If a partition maps to an empty list,
  * that means a full ownership of the partition.
  */
  void assign(Map<TopicPartition, List<Tuple<int64, int64>>> partitionWithKeyRanges);
}

To leverage the key based filtering in a standalone group subscription mode, we also define a new consumer API in standalone modewill also add callbacks the ConsumerRebalanceListener to populate key range assignments if necessary:

Code Block
languagejava
titleConsumer.java
public ConsumerConsumerRebalanceListener {

	@Deprecated
	void  ...onPartitionsRevoked(Collection<TopicPartition> partitions);

 /**	@Deprecated
  * Manually assign a list of partitions with specific key ranges to this consumer. If a partition maps to an empty list,
  * that means a full ownership void onPartitionsAssigned(Collection<TopicPartition> partitions);

    @Deprecated
    default void onPartitionsLost(Collection<TopicPartition> partitions) {
        onPartitionsRevoked(partitions);
    }
	
   /**
    * Note that if a partition maps to an empty list, that means a full ownership of the partition.
    */
  	void assignonPartitionWithKeyRangesAssigned(Map<TopicPartition, List<Tuple<int64, int64>>>int64>> partitionWithKeyRanges); // NEW

	void onPartitionWithKeyRangesRevoked(Map<TopicPartition, List<Tuple<int64, int64>> partitionWithKeyRanges); // NEW

	void onPartitionWithKeyRangesLost(Map<TopicPartition, List<Tuple<int64, int64>> partitionWithKeyRanges); // NEW
}

For better visibility, we shall include the range offsets in the ListConsumerGroupOffsetsResult:

Code Block
languagejava
titleListConsumerGroupOffsetsResult.java
public class ListConsumerGroupOffsetsResult {
	...
    final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future;
	...
}

public class OffsetAndMetadata {

    public long offset();
 
	public List<Tuple<Long, Long>> rangeOffsets(); // NEW

    public String metadata();

    public Optional<Integer> leaderEpoch();
}

New Configurations

Broker configs

...