Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • If there is an IC [48-49] then we no longer need to maintain IC ranges [45-47] and [50-50]. We shall append 2 null records to delete those and augment the original IC into [45-50]
  • If there is an IC [43-44], we are good to move stable offset forward. The operation is like:
    • Push stable offset to 47 in the primary offset topic
    • Append a record to remove IC [45-47] in the IC offset topic

There is also a math we need to do which is how frequent we should commit. The default max request size for Kafka is 1MB, which means we could not include more than 1MB / 16b ≈ 60,000 ranges within a request. This means if we are in auto commit mode, the consumer has to breakdown commits if it already accumulates a large number of ranges. It's also undesirable to let stable offset make no progress for a long time. Thus we will define a new config on the max ranges being collected to trigger a commit call.

Offset Fetch

During a consumer startup in sequential commit mode, it will attempt to fetch the stored offsets before resuming work. This would be the same step for IC mode, only the consumer will be aware of both the individual offsets and stable offsets now. The individual offsets serve as a reference when user calls #poll(), such that if there are any record that's already committed, it will be filtered and the polled records will only contain committed ones. When doing the offset commit, consumer will do a "gap amending" too.

...

To determine the hash range before each consumer generation stabilizes, in the group assignment phase the leader will evaluate the situation and decide whether to trigger the sharing mechanism. For the very first version, a very intuitive criteria is comparing the relative size of topic partitions vs number of consumers that are opting into the key sharing mode. For example if number of consumers m > number of partitions n, we would do the assignment based off partitions instead of  they would be sharing part of partitions for consumption in a round robin fashion. For example, if of consumers. If 5 consumers subscribing to 1 topic with 3 partitions, the final assignment would be:

...

The assigned ranges could then be used by the consumer to make their fetch more specificfruitful. This step unblocks the potential to allow a dynamic consumer group scaling beyond partition level capping..

Look into the future: Transactional Support

Eventually IC semantic has to be compatible with transaction support. Despite the ongoing discussion of any change to the transaction semantics, we are providing a rough plan on integrating with current transaction model in this KIP as well. The current transaction model uses transaction markers to specify whether all the records before it are ready to be revealed in downstream. So inherently, this is a design that obeys partition level ordering. To avoid over-complicating the transaction semantic, we will introduce a new delayed queue or purgatory for transactional IC semantic, where the commit shall be blocked until it could hit the stable offset. Take Kafka Streams as an example, if we have a chunk of data [40-49] with stable offset at 39, with two stream threads A and B are turning on EOS at the same time in the sharing mode:

  1. Stream thread A did a fetch to get [40, 44]
  2. Stream thread B did a fetch to get [45, 49]
  3. Stream thread B finished processing and issue a transactional commit of [45, 49]
  4. As the offset range [40, 44] is held by someone at the moment, transactional commit request will be put in a purgatory to wait for stable offset advance
  5. Stream thread A finished processing and issue a transactional commit of [40, 44]
  6. We first advance the stable offset and reply stream thread A with successful commit
  7. Then we search purgatory to reply stream thread B and allow it to proceedallow it to proceed

This high level workflow is blocking in nature, due to the nature of Kafka transaction model. Another approach is to add the current transaction model capability to track key level ordering which is also very trickyThis high level workflow is blocking in nature, but easier to reason about its correctness. The design is still open to discussion and may change as the Kafka transaction model evolves.

Look into the future: Cooperative Fetch

...

Take a look back at the stateless operations like filter or map, there is no necessity to honor the consumer → key mappings during the processing. From KIP-283, we already know it's very costly and inefficient to copy data around. Based on client's need, broker could do a random assignment when receiving fetch requests without key level granularity. It will keep an advancing marker track of who has been fetching fetched to which position. This means we don't need to load any data into the main memory and the total of IC offsets will be significantly dropped.

...

allow.individual.commit

Determine whether this consumer will participate in a shared consumption mode with other consumers.

Default: false.

max.num.ranges.to.commit

Commit the progress if accumulated offset ranges are beyond this number.

Default: 10,000

Related Work

Pulsar has officially supported key share feature in 2.4, which suggests multiple consumers could share the same partition data.

...