...
In the individual commit mode, the offset metadata shall grow much quicker and harder to predict. To avoid messing up the stable offset, we propose to add another internal topic called `__individual_commit_offsets` which stores the individual commits specifically, and call the current __consumed_offsets topic the primary offset topic. This isolation should make the development more controllable by avoiding messing up primary and achieve at-least-once in worst case when we need to delete the corrupted IC offset topic. The individual commit offset topic shall be required to co-locate with the __consumed_offsets topic, which means it has to share the configuration of number of partitions, replication factor and min replicas as primary offset topic.
The offset commit and offset fetch workflow will be slightly changed under individual commit mode.
...
This high level workflow is blocking in nature, but easier to reason about its correctness. The design is still open to discussion and may change as the Kafka transaction model evolves.
Cooperative Fetch (in long term)
Take a look back at the stateless operations like filter or map, there is no necessity to honor the consumer → key mappings during the processing. From KIP-283, we already know it's very costly and inefficient to copy data around. Based on client's need, broker could do a random assignment when receiving fetch requests without key level granularity. It will keep an advancing marker of who has been fetching to which position. This means we don't need to load any data into the main memory and the total of IC offsets will be significantly dropped.
...
The design is still open to discussion as the trade-off between improvements over complexity is still not clear.
Public Interfaces
The offset commit protocol will be changed to allow IC semantic. The committed offset will include a list of offset ranges:
...
The offset fetch request will also be augmented to incorporate the key hash range. Using a list instead of one hash range allows future extensibility. Also if we want to implement cooperative fetching on top
Code Block |
---|
FetchRequest => MaxWaitTime ReplicaId MinBytes IsolationLevel FetchSessionId FetchSessionEpoch [Topics] [RemovedTopics] KeyRanges GenerationId MemberId SessionTimeout MaxWaitTime => int32 ReplicaId => int32 MinBytes => int32 IsolationLevel => int8 FetchSessionId => int32 FetchSessionEpoch => int32 Topics => TopicName Partitions TopicName => String Partitions => [Partition FetchOffset StartOffset LeaderEpoch MaxBytes] Partition => int32 CurrentLeaderEpoch => int32 FetchOffset => int64 StartOffset => int64 MaxBytes => int32 RemovedTopics => RemovedTopicName [RemovedPartition] RemovedTopicName => String RemovedPartition => int32 KeyRanges => List<Tuple<int64, int64>> // NEW |
We shall also put offset ranges as part of OffsetFetchResponse:
Code Block |
---|
OffsetFetchResponse => ThrottleTimeMs Topics ErrorCode GenerationIdThrottleTimeMs => String // NEW for cooperative fetch MemberIdint32 Topics => StringList<OffsetFetchResponseTopic> // NEWErrorCode for cooperative fetch SessionTimeout => int32 // NEW for cooperative fetch |
We shall also put offset ranges as part of OffsetFetchResponse:
Code Block |
---|
OffsetFetchResponse int16 OffsetFetchResponseTopic => ThrottleTimeMsName Topics ErrorCodePartitions ThrottleTimeMsName => int32 Topics => List<OffsetFetchResponseTopic>int32 ErrorCode Partitions => int16List<OffsetFetchResponsePartition> OffsetFetchResponseTopicOffsetFetchResponsePartition => NamePartitionIndex Partitions CommittedOffset CommittedLeaderEpoch NameMetadata ErrorCode PartitionIndex => int32 Partitions => List<OffsetFetchResponsePartition> OffsetFetchResponsePartition => PartitionIndex CommittedOffset CommittedLeaderEpoch Metadata ErrorCode PartitionIndex => int32 CommittedOffset => int64 CommittedLeaderEpoch => int32 Metadata => string ErrorCode => int16 CommittedRangeOffsets => List<Tuple<int64, int64>> // NEW |
...
New Configurations
Broker configs
accept.individual.commit | Determine whether the group coordinator allows individual commit. Default: true. |
max.bytes.key.range.hashing | The maximum memory allowed to hold partitioned records in-memory. Default: 524288000 bytes. |
individual.commit.log.segment.bytes | The segment size for the individual commit topic. Default: 10485760 bytes. |
fetched.data.max.wait.ms
Consumer configs
allow.individual.commit | determine Determine whether this consumer will participate in a shared consumption mode with other consumers. Default value: false. |
Related Work
Pulsar has officially supported key share feature in 2.4, which suggests multiple consumers could share the same partition data.
...