Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

When rebalancing, instead of allowing the current consumer (consumer1) to restart from the last committed offset (70), we should allow it to start processing from the latest offset (120) that could be polled at the time of the consumer's recovery (assuming that the log has grown within the time that the process was down). Meanwhile, a new KafkaConsumer (consumer2) will be created to start processing from the last committed offset (70) and run in concurrency with consumer1. Consumer2 will terminate when it has reached offset 120. By default, this setup will support at-least-once semantics.

Ordering Guarantees

Effects on KafkaConsumer#poll()

When the user polls() for new offsets, if consumer2 has not finished, then only offsets that has been processed by consumer2 will be returned. Meanwhile, as long as consumer2 has not terminated, a temporary buffer will be instantiated to contain the offsets processed from consumer1. Once consumer2 has terminated, offsets that has accumulated in this buffer will be returned to the userTwo threads operating simultaneously on one log would naturally mess up ordering guarantees. To ensure that the offsets will arrive in correct sequence, we would have these two processes write into two distinct buffers.  One of them (buffer2) will contain the offsets from 70-120, while the other (buffer1) would contain the offsets from 120 onwards.

Kafka checkpointing

In this scenario, if auto commit is enabled, we would commit offsets from buffer2 consumer2 directly into the original commit log.  Once committed, those offset will be discarded. Meanwhile, the offsets found in buffer1 consumer1 will be checkpointed in another Kafka topic. When calling KafkaConsumer#committed, if consumer2 has not terminated, then only offsets checkpointed by consumer2 will be sent. Otherwise, the offsets checkpointed by consumer1 will be returned instead.

Effects on KafkaConsumer#poll()

When the user polls() for new offsets, if consumer2 has not finished, then only offsets from buffer2 will be sent to the user. After each poll(), buffer2 will be emptied of its contents after its records have been returned to the user to free up memory. However, if consumer2 has terminated, then all offsets that has accumulated in buffer1 will also be sent (with both buffer2 and buffer1 being discarded, now that both are no longer used). In this manner, we could also guarantee ordering for the user.




Corner Cases

There is a possibility, however remote, that there could be a chained failure. In this case, either consumer1 or consumer2 will fail. If consumer1 were to fail, we could simply repeat our previous policy, and instantiate another thread to process the intervening lag. However, if consumer2 were to fail, no new processes will be created. This is because the offset range that consumer2 is processing is bounded, so this means that consumer2 does not have to deal with a continuous influx of new records – thus no "lag" technically to speak of. Note that if consumer1 crashes N times, N child processes will be created. Checkpointing will write the progress from these N child processes into N distinct logs.

...