Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In the individual commit mode, the offset metadata shall grow much quicker and harder to predict. To avoid messing up the stable offset, we propose to add another internal topic called `__individual_commit_offsets` which stores the individual commits specifically, and call the current __consumed_offsets topic the primary offset topic. This isolation should make the development more controllable by avoiding messing up primary and achieve at-least-once in worst case when we need to delete the corrupted IC offset topic. The individual commit offset topic shall be required to co-locate with the __consumed_offsets topic, which means it has to share the configuration of number of partitions, replication factor and min replicas as primary offset topic.

The offset commit and offset fetch workflow will be slightly changed under individual commit mode.

...

This high level workflow is blocking in nature, but easier to reason about its correctness. The design is still open to discussion and may change as the Kafka transaction model evolves.

Cooperative Fetch (in long term)

Take a look back at the stateless operations like filter or map, there is no necessity to honor the consumer → key mappings during the processing. From KIP-283, we already know it's very costly and inefficient to copy data around. Based on client's need, broker could do a random assignment when receiving fetch requests without key level granularity. It will keep an advancing marker of who has been fetching to which position. This means we don't need to load any data into the main memory and the total of IC offsets will be significantly dropped.

...

The design is still open to discussion as the trade-off between improvements over complexity is still not clear.

Public Interfaces

The offset commit protocol will be changed to allow IC semantic. The committed offset will include a list of offset ranges:

...

The offset fetch request will also be augmented to incorporate the key hash range. Using a list instead of one hash range allows future extensibility. Also if we want to implement cooperative fetching on top 

Code Block
FetchRequest => MaxWaitTime ReplicaId MinBytes IsolationLevel FetchSessionId FetchSessionEpoch [Topics] [RemovedTopics] KeyRanges GenerationId MemberId SessionTimeout
  MaxWaitTime => int32
  ReplicaId => int32
  MinBytes => int32
  IsolationLevel => int8
  FetchSessionId => int32
  FetchSessionEpoch => int32
  Topics => TopicName Partitions
    TopicName => String
    Partitions => [Partition FetchOffset StartOffset LeaderEpoch MaxBytes]
        Partition => int32
        CurrentLeaderEpoch => int32
        FetchOffset => int64
        StartOffset => int64
        MaxBytes => int32
  RemovedTopics => RemovedTopicName [RemovedPartition]
    RemovedTopicName => String
    RemovedPartition => int32
  KeyRanges => List<Tuple<int64, int64>> // NEW

We shall also put offset ranges as part of OffsetFetchResponse:

Code Block
OffsetFetchResponse => ThrottleTimeMs Topics ErrorCode
  GenerationIdThrottleTimeMs => String // NEW for cooperative fetch
  MemberIdint32
  Topics         => StringList<OffsetFetchResponseTopic>
 // NEWErrorCode for cooperative fetch
  SessionTimeout => int32 // NEW for cooperative fetch

We shall also put offset ranges as part of OffsetFetchResponse:

Code Block
OffsetFetchResponse int16

OffsetFetchResponseTopic => ThrottleTimeMsName Topics ErrorCodePartitions
  ThrottleTimeMsName => int32
  Topics         => List<OffsetFetchResponseTopic>int32
  ErrorCode Partitions     => int16List<OffsetFetchResponsePartition>
  
OffsetFetchResponseTopicOffsetFetchResponsePartition => NamePartitionIndex Partitions
CommittedOffset CommittedLeaderEpoch NameMetadata ErrorCode
  PartitionIndex        => int32
  Partitions     => List<OffsetFetchResponsePartition>
  
OffsetFetchResponsePartition => PartitionIndex CommittedOffset CommittedLeaderEpoch Metadata ErrorCode
  PartitionIndex => int32
  CommittedOffset => int64
  CommittedLeaderEpoch => int32
  Metadata => string
  ErrorCode => int16
  CommittedRangeOffsets => List<Tuple<int64, int64>> // NEW

...

New Configurations

Broker configs


accept.individual.commit

Determine whether the group coordinator allows individual commit.

Default: true.

max.bytes.key.range.hashing

The maximum memory allowed to hold partitioned records in-memory.

Default: 524288000 bytes.

individual.commit.log.segment.bytes

The segment size for the individual commit topic.

Default: 10485760 bytes.

fetched.data.max.wait.ms

The maximum amount of time in ms that the fetch manager will wait

Consumer configs

allow.individual.commit

determine Determine whether this consumer will participate in a shared consumption mode with other consumers.

Default value: false.

Related Work

Pulsar has officially supported key share feature in 2.4, which suggests multiple consumers could share the same partition data.

...