...
Cooperative fetch is more like an optimization upon our stateless use case scenario, instead of a new feature. To make it right, we have to encode more metadata within the consumer fetch request such as consumer generation and consumer id, wisely refusing advancing the marker unless necessary when 1. the consumer is zombie, 2. the same consumer is retrying the fetch 3. session timeout. And there should also be a max wait time for commit gaps so that we don't lose the chance to process the data when consumer whoever did the fetch encounters hard failures already. It could be set the same as consumer configured session timeout.
The design is still open to discussion.
Public Interfaces
The offset commit protocol will be changed to allow IC semantic. The committed offset will include a list of offset ranges:
...
The offset fetch request will also be augmented to incorporate the key hash range. Using a list instead of one hash range allows future extensibility. Also if we want to implement cooperative fetching on top
Code Block |
---|
FetchRequest => MaxWaitTime ReplicaId MinBytes IsolationLevel FetchSessionId FetchSessionEpoch [Topics] [RemovedTopics] KeyRanges GenerationId MemberId SessionTimeout
MaxWaitTime => int32
ReplicaId => int32
MinBytes => int32
IsolationLevel => int8
FetchSessionId => int32
FetchSessionEpoch => int32
Topics => TopicName Partitions
TopicName => String
Partitions => [Partition FetchOffset StartOffset LeaderEpoch MaxBytes]
Partition => int32
CurrentLeaderEpoch => int32
FetchOffset => int64
StartOffset => int64
MaxBytes => int32
RemovedTopics => RemovedTopicName [RemovedPartition]
RemovedTopicName => String
RemovedPartition => int32
KeyRanges => List<Tuple<int64, int64>> // NEW |
To leverage the key based filtering in a standalone mode, we also define a new consumer API in standalone mode:
Code Block | ||||
---|---|---|---|---|
| ||||
public Consumer { ... /** * Manually assign a list of partitions with specific key ranges to this consumer. If a partition maps to an empty list, * that means a full ownership of the partition. */ void assign(Map<TopicPartition, List<Tuple<int64, int64>>> partitionWithKeyRanges); } |
New Configurations
Broker configs
...
fetched.data.max.wait.ms
...
The maximum amount of time in ms that the fetch manager will wait un
Default is 604800000 (7 days). This allows periodic weekly producer jobs to maintain their ids.
...
max.transaction.timeout.ms
...
The maximum allowed timeout for transactions. If a client’s requested transaction time exceed this, then the broker will return a InvalidTransactionTimeout error in InitPidRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction.
Default is 900000 (15 min). This is a conservative upper bound on the period of time a transaction of messages will need to be sent.
...
The number of replicas for the transaction state topic.
Default: 3
...
The number of partitions for the transaction state topic.
Default: 50
GenerationId => String // NEW for cooperative fetch
MemberId => String // NEW for cooperative fetch
SessionTimeout => int32 // NEW for cooperative fetch |
We shall also put offset ranges as part of OffsetFetchResponse:
Code Block |
---|
OffsetFetchResponse => ThrottleTimeMs Topics ErrorCode
ThrottleTimeMs => int32
Topics => List<OffsetFetchResponseTopic>
ErrorCode => int16
OffsetFetchResponseTopic => Name Partitions
Name => int32
Partitions => List<OffsetFetchResponsePartition>
OffsetFetchResponsePartition => PartitionIndex CommittedOffset CommittedLeaderEpoch Metadata ErrorCode
PartitionIndex => int32
CommittedOffset => int64
CommittedLeaderEpoch => int32
Metadata => string
ErrorCode => int16
CommittedRangeOffsets => List<Tuple<int64, int64>> // NEW |
To leverage the key based filtering in a standalone mode, we also define a new consumer API in standalone mode:
Code Block | ||||
---|---|---|---|---|
| ||||
public Consumer {
...
/**
* Manually assign a list of partitions with specific key ranges to this consumer. If a partition maps to an empty list,
* that means a full ownership of the partition.
*/
void assign(Map<TopicPartition, List<Tuple<int64, int64>>> partitionWithKeyRanges);
} |
For better visibility, we would also include
Code Block | ||||
---|---|---|---|---|
| ||||
public Consumer {
...
/**
* Manually assign a list of partitions with specific key ranges to this consumer. If a partition maps to an empty list,
* that means a full ownership of the partition.
*/
void assign(Map<TopicPartition, List<Tuple<int64, int64>>> partitionWithKeyRanges);
} |
New Configurations
Broker configs
Consumer configs
fetched.data.max.wait.ms |
The maximum amount of time in ms that the fetch manager will wait |
Default is 604800000 (7 days). This allows periodic weekly producer jobs to maintain their ids.
The number of replicas for the transaction state topic.
Default: 3
Related Work
Pulsar has officially supported key share feature in 2.4, which suggests multiple consumers could share the same partition data.
The blockers for us to implement a similar feature are:
- Our consumer model is pull based, which incurs random read if we let consumers ask for specific keyed records. Sequential read is the key performance sugar for Kafka, as otherwise we could not bypass memory copy of the data.
- Our broker doesn’t know anything about data distribution, as all the metadata is encoded and looks opaque to them. In reality, we could not afford letting consumers fetch with raw keys.
- Consumer coordinator is at a centralized location, however we need to distribute keys in different partitions. For Pulsar their offset data is co-located with actual partitions. The burden for broadcasting the state change in Kafka would be pretty hard and very error-prone.
Compatibility, Deprecation, and Migration Plan
...
- broker has to be upgraded first, until then all the commit request with hash range configured shall fail.
- Upgrade the client version.
...
Consumer configs
allow.individual.commit | determine whether this consumer will participate in a shared consumption mode with other consumers. Default value: false. |
Related Work
Pulsar has officially supported key share feature in 2.4, which suggests multiple consumers could share the same partition data.
The blockers for us to implement a similar feature are:
- Our consumer model is pull based, which incurs random read if we let consumers ask for specific keyed records. Sequential read is the key performance sugar for Kafka, as otherwise we could not bypass memory copy of the data.
- Our broker doesn’t know anything about data distribution, as all the metadata is encoded and looks opaque to them. In reality, we could not afford letting consumers fetch with raw keys.
- Consumer coordinator is at a centralized location, however we need to distribute keys in different partitions. For Pulsar their offset data is co-located with actual partitions. The burden for broadcasting the state change in Kafka would be pretty hard and very error-prone.
Compatibility, Deprecation, and Migration Plan
- Individual Commit. Although it's not of real use with only individual commit support, Kafka users or developers could start evaluating its performance with some customized applications.
- broker has to be upgraded first, until then all the commit request with hash range configured shall fail.
- Upgrade the client version.
- Key based filtering. Note that if individual commit is already launched when we finished the setup for key based filtering, users could use assign mode to co-processing same partition already.
- upgrade broker first
- Upgrade the client
- Rebalance support. The config `allow.individual.commit` controls whether a consumer will choose to participate in a key-sharing assignment. If not turning on, the leader consumer shall not consider making it share with others.
As transactional support and cooperative fetch need more design discussion, we will delay putting on the upgrade path now.
Test Plan
Each module has to be well tested by unit test, integration test and system test. For system test, we need to setup different scenarios and verify the eventual progress shall be made.
...
Rejected Alternatives
There are a couple of alternatives to this proposal here.
...