Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

When the offset range is not empty, broker will only handle offset ranges and ignore the plain offset field. Optionally broker will check if the original offset field is set to -1 to make sure there is no data corruption in the request. Trivially, transaction offset commit will also include the offset range.

The offset fetch request will also be augmented to incorporate the key hash range. Using a list instead of one hash range allows future extensibility.

Code Block
FetchRequest => MaxWaitTime ReplicaId MinBytes IsolationLevel FetchSessionId FetchSessionEpoch [Topics] [RemovedTopics] KeyRanges
  MaxWaitTime => int32
  ReplicaId => int32
  MinBytes => int32
  IsolationLevel => int8
  FetchSessionId => int32
  FetchSessionEpoch => int32
  Topics => TopicName Partitions
    TopicName => String
    Partitions => [Partition FetchOffset StartOffset LeaderEpoch MaxBytes]
        Partition => int32
        CurrentLeaderEpoch => int32
        FetchOffset => int64
        StartOffset => int64
        MaxBytes => int32
  RemovedTopics => RemovedTopicName [RemovedPartition]
    RemovedTopicName => String
    RemovedPartition => int32
  KeyRanges => List<Tuple<int64, int64>> // NEW

Related Work

Pulsar has officially supported key share feature in 2.4, which suggests multiple consumers could share the same partition data.

The blockers for us to implement a similar feature are:

  1. Our consumer model is pull based, which incurs random read if we let consumers ask for specific keyed records. Sequential read is the key performance sugar for Kafka, as otherwise we could not bypass memory copy of the data.
  2. Our broker doesn’t know anything about data distribution, as all the metadata is encoded and looks opaque to them. In reality, we could not afford letting consumers fetch with raw keys.
  3. Consumer coordinator is at a centralized location, however we need to distribute keys in different partitions. For Pulsar their offset data is co-located with actual partitions. The burden for broadcasting the state change in Kafka would be pretty hard and very error-prone.

Compatibility, Deprecation, and Migration Plan

...

To leverage the key based filtering in a standalone mode, we also define a new consumer API in standalone mode:

Code Block
languagejava
titleConsumer.java
public Consumer {
  ...

 /**
  * Manually assign a list of partitions with specific key ranges to this consumer. If a partition maps to an empty list,
  * that means a full ownership of the partition.
  */
  void assign(Map<TopicPartition, List<Tuple<int64, int64>>> partitionWithKeyRanges);
}


Related Work

Pulsar has officially supported key share feature in 2.4, which suggests multiple consumers could share the same partition data.

The blockers for us to implement a similar feature are:

  1. Our consumer model is pull based, which incurs random read if we let consumers ask for specific keyed records. Sequential read is the key performance sugar for Kafka, as otherwise we could not bypass memory copy of the data.
  2. Our broker doesn’t know anything about data distribution, as all the metadata is encoded and looks opaque to them. In reality, we could not afford letting consumers fetch with raw keys.
  3. Consumer coordinator is at a centralized location, however we need to distribute keys in different partitions. For Pulsar their offset data is co-located with actual partitions. The burden for broadcasting the state change in Kafka would be pretty hard and very error-prone.

Compatibility, Deprecation, and Migration Plan

  1. Individual Commit. Although it's not of real use with only individual commit support, Kafka users or developers could start evaluating its performance with some customized applications.
    1. broker has to be upgraded first, until then all the commit request with hash range configured shall fail.
    2. Upgrade the client version.
  2. Key based filtering. Note that if individual commit is already launched when we finished the setup for key based filtering, users could use assign mode to co-processing same partition already.
    1. upgrade broker first
    2. Upgrade the client 


Rejected Alternatives

There are a couple of alternatives to this proposal here.

...