Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Cooperative fetch is more like an optimization upon our stateless use case scenario, instead of a new feature. To make it right, we have to encode more metadata within the consumer fetch request such as consumer generation and consumer id, wisely refusing advancing the marker unless necessary when 1. the consumer is zombie, 2. the same consumer is retrying the fetch 3. session timeout. And there should also be a max wait time for commit gaps so that we don't lose the chance to process the data when consumer whoever did the fetch encounters hard failures already. It could be set the same as consumer configured session timeout.

The design is still open to discussion.

Public Interfaces

The offset commit protocol will be changed to allow IC semantic. The committed offset will include a list of offset ranges:

...

The offset fetch request will also be augmented to incorporate the key hash range. Using a list instead of one hash range allows future extensibility. Also if we want to implement cooperative fetching on top

Code Block
FetchRequest => MaxWaitTime ReplicaId MinBytes IsolationLevel FetchSessionId FetchSessionEpoch [Topics] [RemovedTopics] KeyRanges GenerationId MemberId SessionTimeout
  MaxWaitTime => int32
  ReplicaId => int32
  MinBytes => int32
  IsolationLevel => int8
  FetchSessionId => int32
  FetchSessionEpoch => int32
  Topics => TopicName Partitions
    TopicName => String
    Partitions => [Partition FetchOffset StartOffset LeaderEpoch MaxBytes]
        Partition => int32
        CurrentLeaderEpoch => int32
        FetchOffset => int64
        StartOffset => int64
        MaxBytes => int32
  RemovedTopics => RemovedTopicName [RemovedPartition]
    RemovedTopicName => String
    RemovedPartition => int32
  KeyRanges => List<Tuple<int64, int64>> // NEW

To leverage the key based filtering in a standalone mode, we also define a new consumer API in standalone mode:

Code Block
languagejava
titleConsumer.java
public Consumer {
  ...

 /**
  * Manually assign a list of partitions with specific key ranges to this consumer. If a partition maps to an empty list,
  * that means a full ownership of the partition.
  */
  void assign(Map<TopicPartition, List<Tuple<int64, int64>>> partitionWithKeyRanges);
}

New Configurations

Broker configs

...

fetched.data.max.wait.ms

...

The maximum amount of time in ms that the fetch manager will wait un

Default is 604800000 (7 days). This allows periodic weekly producer jobs to maintain their ids.

...

max.transaction.timeout.ms

...

The maximum allowed timeout for transactions. If a client’s requested transaction time exceed this, then the broker will return a InvalidTransactionTimeout error in InitPidRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction.

Default is 900000 (15 min). This is a conservative upper bound on the period of time a transaction of messages will need to be sent.

...

The number of replicas for the transaction state topic.

Default: 3

...

The number of partitions for the transaction state topic.

Default: 50

GenerationId => String // NEW for cooperative fetch
  MemberId => String // NEW for cooperative fetch
  SessionTimeout => int32 // NEW for cooperative fetch

We shall also put offset ranges as part of OffsetFetchResponse:

Code Block
OffsetFetchResponse => ThrottleTimeMs Topics ErrorCode
  ThrottleTimeMs => int32
  Topics         => List<OffsetFetchResponseTopic>
  ErrorCode      => int16

OffsetFetchResponseTopic => Name Partitions
  Name           => int32
  Partitions     => List<OffsetFetchResponsePartition>
  
OffsetFetchResponsePartition => PartitionIndex CommittedOffset CommittedLeaderEpoch Metadata ErrorCode
  PartitionIndex => int32
  CommittedOffset => int64
  CommittedLeaderEpoch => int32
  Metadata => string
  ErrorCode => int16
  CommittedRangeOffsets => List<Tuple<int64, int64>> // NEW


To leverage the key based filtering in a standalone mode, we also define a new consumer API in standalone mode:

Code Block
languagejava
titleConsumer.java
public Consumer {
  ...

 /**
  * Manually assign a list of partitions with specific key ranges to this consumer. If a partition maps to an empty list,
  * that means a full ownership of the partition.
  */
  void assign(Map<TopicPartition, List<Tuple<int64, int64>>> partitionWithKeyRanges);
}


For better visibility, we would also include

Code Block
languagejava
titleConsumer.java
public Consumer {
  ...

 /**
  * Manually assign a list of partitions with specific key ranges to this consumer. If a partition maps to an empty list,
  * that means a full ownership of the partition.
  */
  void assign(Map<TopicPartition, List<Tuple<int64, int64>>> partitionWithKeyRanges);
}

New Configurations

Broker configs

Consumer configs

fetched.data.max.wait.ms

The maximum amount of time in ms that the fetch manager will wait

un

Default is 604800000 (7 days). This allows periodic weekly producer jobs to maintain their ids.

transaction.state.log.replication.factor

The number of replicas for the transaction state topic.

Default: 3

Related Work

Pulsar has officially supported key share feature in 2.4, which suggests multiple consumers could share the same partition data.

The blockers for us to implement a similar feature are:

  1. Our consumer model is pull based, which incurs random read if we let consumers ask for specific keyed records. Sequential read is the key performance sugar for Kafka, as otherwise we could not bypass memory copy of the data.
  2. Our broker doesn’t know anything about data distribution, as all the metadata is encoded and looks opaque to them. In reality, we could not afford letting consumers fetch with raw keys.
  3. Consumer coordinator is at a centralized location, however we need to distribute keys in different partitions. For Pulsar their offset data is co-located with actual partitions. The burden for broadcasting the state change in Kafka would be pretty hard and very error-prone.

Compatibility, Deprecation, and Migration Plan

...

  1. broker has to be upgraded first, until then all the commit request with hash range configured shall fail.
  2. Upgrade the client version.

...

Consumer configs

allow.individual.commit

determine whether this consumer will participate in a shared consumption mode with other consumers.

Default value: false.

Related Work

Pulsar has officially supported key share feature in 2.4, which suggests multiple consumers could share the same partition data.

The blockers for us to implement a similar feature are:

  1. Our consumer model is pull based, which incurs random read if we let consumers ask for specific keyed records. Sequential read is the key performance sugar for Kafka, as otherwise we could not bypass memory copy of the data.
  2. Our broker doesn’t know anything about data distribution, as all the metadata is encoded and looks opaque to them. In reality, we could not afford letting consumers fetch with raw keys.
  3. Consumer coordinator is at a centralized location, however we need to distribute keys in different partitions. For Pulsar their offset data is co-located with actual partitions. The burden for broadcasting the state change in Kafka would be pretty hard and very error-prone.

Compatibility, Deprecation, and Migration Plan

  1. Individual Commit. Although it's not of real use with only individual commit support, Kafka users or developers could start evaluating its performance with some customized applications.
    1. broker has to be upgraded first, until then all the commit request with hash range configured shall fail.
    2. Upgrade the client version.
  2. Key based filtering. Note that if individual commit is already launched when we finished the setup for key based filtering, users could use assign mode to co-processing same partition already.
    1. upgrade broker first
    2. Upgrade the client 
  3. Rebalance support. The config `allow.individual.commit` controls whether a consumer will choose to participate in a key-sharing assignment. If not turning on, the leader consumer shall not consider making it share with others.

As transactional support and cooperative fetch need more design discussion, we will delay putting on the upgrade path now.



Test Plan

Each module has to be well tested by unit test, integration test and system test. For system test, we need to setup different scenarios and verify the eventual progress shall be made.

...


Rejected Alternatives

There are a couple of alternatives to this proposal here.

...