Status
Current state: Under Discussion
Discussion thread: here [TBD]
JIRA: KAFKA-6989
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, in Kafka Streams, a single thread is used to process tasks which could result in a performance bottleneck. With the current model, under the condition of a fatal error, a new retry topic will be created in which failed records will be sent. Incoming records could not be processed until the failed records in the retry topic are fully reprocessed to guarantee ordering. This could result in a lag because we are essentially backtracking to make sure all data has been sent. New data that would have been processed if a failure had not occurred will have to wait while the thread paused on previous records.
Public Interfaces
We have a couple of choices available to us as mentioned in the ticket discussion (KAFKA-6989). Recall that in asynchronous methods, we do not need inputs from a previous function to start it. All we need is the signal. However, when offsets have been committed or processed in an asynchronized manner, we will need to determine the behavior that occurs after the conclusion of the process. It would also be wise to remember that asynchronous processing does not necessarily require an extra thread. When offsets are committed or received, we should consider the following:
- Ordering: As noted in the JIRA chat, Samza has judged it to be impossible for the records to be returned in its original sequence by their implementation (if there is more than one thread that is consuming data). A sort of pseudo-ordering could be guaranteed if we sort the records that we return by time of arrival and offset number, but that might not matter considering that the record's offsets are not consecutive.
- Exactly-Once: In exactly-once semantics, each offsets are returned once. This will not be possible if multiple threads are active (i.e. more than one thread is calling the same Kafka Streams instance).
- Latency and Resilience: Whenever we attempt to retry processing, as mentioned in the JIRA ticket, it could end up as a performance bottleneck because we are effectively "stopping the world" by pausing on a single record. An option to avoid this is to allow a second thread to handle these failed records while we continue to process incoming metadata. However, exactly once and ordering would be impossible to guarantee under these conditions.
Now we have two ways to approach this issue. Note that in KafkaConsumer
, its implementation basically prevents more than one thread from calling it at once. So if we wish to follow a similar methodology with Kafka Streams, we will have a couple of tradeoffs:
- Positive sides: Ordering would still be guaranteed, and exactly once will still be the same. We would add a method with behavior similar to
KafkaConsumer#commitAsync
in that it accepts a callback defined by the user and calls it once it is done. The user's thread will move on while a hidden process by Kafka takes over. - Negative sides: As mentioned, failure handling would still be a major problem (the user has to deal with this). We could still end up stuck on one record which for some reason continues to stubbornly fail. It could end up slowing down the whole process.
Note that this line of thought does not change much because this is close to saying: you could choose between KafkaConsumer#commitSync
and KafkaConsumer#commitAsync()
. Next, we have the multithreaded approach and we can take it one step further. Instead of one thread, we could have the following:
- Positive sides: Failure handling is better now in that multiple threads are on the job. While the secondary thread takes care of the failed metadata, the primary thread could move on processing new ones. Since the failed metadata topic's workload is not constantly increasing, we will have time to process them. Once the secondary thread has finished with the failed records, it could be terminated, thus freeing up space. Latency would be reduced.
- Negative sides: Ordering is now impossible to guarantee, as is exactly-once because we have no way of knowing which records has been returned since asyncronous processes have no way of communicating between one another.
In the first approach I outlined, we are essentially giving the user some more flexibility in deciding how to resolve the latency and failure handling problem (that is they can choose on whether not to sacrifice ordering for latency). They would have to implement the multithreaded portion themselves to make up for it. The second approach takes some load off the client's back in that we figure out how to process the records using multiple threads, and clients doesn't have to worry about anything complex. Note that with the second plan, we still add the capability to commit synchronously or asynchronously regardless.
We could set the first approach as the default policy, while the second approach would be used only if the client specifies us to use more than one thread.
Proposed Changes
Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.