...
Currently, in Kafka, when a consumer falls out of a consumer group, it will restart processing from the last checkpointed offset. However, this design could result in a lag which some users could not afford to let happen. For example, lets say a consumer crashed at offset 100, with the last checkpointed offset being at 70. When the consumer recovers at a later time, the offset log being consumed has grown to offset 120. Consequently, there is a lag of 50 offsets (120 - 70). This is because the consumer restarted at 70, forcing it to start processing at a point far behind the end of the log. This lag is not acceptable in some computing applications (e.g. traffic control) who needs information by the millisecond, therefore, they will need a faster option for processing the offset lags that results from a crash. Therefore, our goal for this KIP is to mitigate the lag (in terms of latency and time) caused by a crash by increasing the speed of processing records using two consumers instead of one.For example, after a consumer thread fails, the offsets after the crash point will have a longer latency before it is processed. Meanwhile, offsets before the crash point but after the last committed offset will be reprocessed to ensure that there is no data loss. We propose a new design to avoid the latency described by using two new threads instead of one. These two threads will process the offsets in such a way that it guarantees the user the data after the crash point with vastly reduced delay.
It has been noted that this is a relatively uncommon case among Kafka users. However, among streaming jobs which uses Kafka as a sink or source, this is a sought-after capability. Some streaming jobs such as Spark's Structured Streaming might have need for this proposal.
Public Interfaces
This new mode (name is enable.parallel.rebalance
) will by default not be used by KafkaConsumer (e.g. will be by default equal to false
). If the user chooses, he or she can activate this mode by setting the config's value to true
.
With the introduction of this mode, the offset lag which results from a crash will be removed and any latency in terms of time that results would be negated (i.e. we continue processing as if the crash never happened). The user can use multiple threads of course to speed up processing of records to mitigate this lag, but they would only resolve the problem to an extent. The user does not have access to critical information in Kafka internals (for example, when a missed heartbeat triggers a LeaveGroupRequest). So the user are not in any position to exploit this information. Meanwhile, we can.
With the enabling of this mode, there will be an increased burden on a computer's processor cores because an extra consumer thread will be instantiated and run in parallel with the old consumer. In this manner, we could retrieve records at twice that of the current design. Note that when using this mode, offsets would no longer be given to the user in correct sequence (or order), but rather in a more haphazard manner. The mentality of this mode could be described as "give me all the offsets as fast as you can, I don't care about order." We will be able to guarantee at-least-once semantics, however the sequence which they come in will be thrown off. To illustrate, offsets could be sent to the user in this order: 1,2,3,6,4,7,5,8
A method called childConsumerIsAlive()
will be introduced which returns a boolean value indicating if the secondary thread created has started or not. If true, then the thread is alive. This allows the user to check if a second thread is running.
Please note that the intension for this section of the KIP is only to given an overview of what will be done. Please take a look at the Design section below to get a better idea of how this mode operates.
Design
Here is the current design of a consumer and what happens when it recovers from a crash:
...
2) If there is yet another crash of the first consumer, then the lag that results will have its offsets be written to the second commit log. And you could probably guess where the remaining offsets would go based upon the policy described in the previous step. For example, if we were to crash again at offset 170 and recover at a point where the end of the log has grown to 190, with the last checkpointed offset being at 140. Then we will commit offsets 140 - 190 in the second commit log, and 190+ in the first.
Please note that if auto commit is enabled, the two consumer threads will independently commit the offsets they have processed. Therefore, if one of the threads fail, then the other is not effected.
There is one corner case regarding crashes that had not been discussed yet, and that is if the secondary consumer (the one created by Kafka internals) would fail. In this situation, once it had recovered, it will resume processing from the last committed offset, unlike the first consumer which cannot do so. (This is because the information being processed is bounded, while the first consumer has to deal with a continuous influx of offsets). One piece of information that also needs to be stored in a commit log is the offset ranges themselves (i.e. 70 and 120 marks an offset range - we need to remember these two numbers so that we could tell when to stop retrieving records for the second consumer). To prevent us from losing track of which offset ranges to process, they need to be checkpointed as a token.
Effects on Current API And Behavior
...
committed() - When returning the last checkpointed offset, rather than return the truly latest committed offset (i.e. the first consumer receiving the most recent data will probably have committed a offset for this partition), we will check if the second consumer (the one created by Kafka internals) has finished committing all offsets relating to this partition from the offset lags that result from a crash. If yes, then we will return the latest committed offset from the user-created consumer as planned. If no, then we will return the last offset that had been committed by the second consumer. This guarantees that any offset before the one returned by KafkaConsumer#committed has also been checkpointed as well.
position() - will call the user-created consumer's position, the second consumer (created by Kafka internals) will not take any part in this call
...