Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Before we dive into how we could change the preexisting codebase, it would be helpful to clarify on some of the details regarding the current Kafka Streams design. Right now, threads are independent of one another–that is, each thread is in charge of its own stream task. There is no cooperation between threads. And as a result, they are not processing the same topicpartition, but instead distinct ones. For example, one thread processes topic1 partition1 while another processes topic2partition2. These two threads does not attempt to access the other topic because such knowledge is not given to them. In this manner, the two threads are isolated from one another. 

However, this design could be problematic under certain conditions. When failed records are moved to a retry topic, the current thread has no way of passing it off to another thread and say "here, you could do this work for me so it doesn't hold up the line." Generally, a API which saves the user most implementation details seems to be favorable, therefore we will likely have to do most of the heavy lifting ourselves (that is implement the multithreading and the extra asynchronous processes ourselves, kinda like Samza). So we have some options to fix this issue:

  1. Allow the current thread access to other threads on standby. And when a need for increased processing speed is necessary, we could call upon these reserve threads and then release them once their task is complete so that they could be used elsewhere.
  2. Another option instead is to maintain the isolation of the threads, and use a method that does not block (i.e. use a similar policy to KafkaConsumer#commitAsync).

Please note that these additions will probably affect only internals. The current methods of Kafka Streams would not be modified (like StateStore's put() or get() operations).

 

Currently, when a new KafkaStreams applications is initialized, the user would have a config available to them which defines the number of threads KafkaStreams will use (num-stream-threads). What will happen is that N StreamThread instances would be created where N = num-stream-threads. However, if N is greater than the number of tasks, then some threads will be held in reserve and will idle unless some other thread fails, in which case, they will be brought online. By current structure, each task will have at maximum one thread processing it.

Please note that any new additions made by this KIP will probably affect only internals. The current methods found in Processor API and their behavior would most likely not be changed.

Processor API Structure

draw.io Diagram
bordertrue
viewerToolbartrue
fitWindowfalse
diagramNameCurrent Flow of Processor API
simpleViewerfalse
diagramWidth781
revision3

Above, we could see a simplified diagram of how KafkaStreams implements the Processor API. It should be noted that one StreamThread can process records from multiple StreamTasks at once. But this is not applicable in reverse. A StreamTask could not be sending records to multiple StreamThreads.  This is a major bottleneck and we would need to work to fix this.

Public Interfaces

We have a couple of choices available to us as mentioned in the ticket discussion (KAFKA-6989). Recall that in asynchronous methods, we do not need inputs from a previous function to start it. All we need is the signal. However, when offsets have been committed or processed in an asynchronized manner, we will need to determine the behavior that occurs after the conclusion of the process. It would also be wise to remember that asynchronous processing does not necessarily require an extra thread, but in this case, it is necessary.  When offsets are committed or received/processed, we should consider the following:

  1. Ordering: As noted in the JIRA chat, Samza has judged it to be hard impossible for the records to be returned in its original sequence by their implementation. 
  2. Exactly-Once: In exactly-once semantics, each offsets are returned once. This will not be possible hard if multiple threads are active (i.e. more than one thread is calling the same Kafka Streams instance).
  3. Latency and Resilience: Whenever we attempt to retry processing, as mentioned in the JIRA ticket, it could end up as a performance bottleneck because we are effectively "stopping the world" by pausing on a single record. An option to avoid this is to allow a second thread to handle these failed records while we continue to process incoming metadata. However, exactly once and ordering would not be supported under these conditions.

Now we have two ways to approach this issue. Note that in KafkaConsumer, its implementation basically prevents more than one thread from calling it at once. So if we wish to follow a similar methodology with Kafka Streams, we will have the following:

  1. Positive sides: Ordering would still be guaranteed, and exactly once will still be the same. We would add method(s) with behavior similar to KafkaConsumer#commitAsync in that it accepts a callback defined by the user and calls it once it is done. The user's thread will move on while we construct a future that waits on the task's completion. In this manner, any new methods does not block.
  2. Negative sides: As mentioned, failure handling would still be a major problem (the user has to deal with this). We could still end up stuck on one record which for some reason continues to stubbornly fail. It could end up slowing down the whole process. 

So basically, what this does is that it allows the user to choose between a method that does not block and one that does. The isolation between threads are still maintained. Next, we have the multithreaded approach and we can take an alternative route. Instead of one thread, we could have some more tradeoffs:To speed things up, we should consider how to use multiple asynchronous threads to process from one StreamTask (i.e. receive records from the StreamTask to be processed).

  1. Positive sides: Failure handling is better now in that multiple threads are on the job. While a secondary thread takes could possibly take care of the failed metadata, the primary thread could move on processing new ones. Since the failed metadata topic's workload is not constantly increasing, we will have time to process them. Once the secondary thread has finished with the failed records, it could be terminated, thus freeing up CPU resources and space. Latency would be reduced.Now the records to be processed are being split between threads, we could process them faster. 
  2. Negative sides: Ordering is now harder to guarantee, and exactly-once is impossible because we have no way of knowing which records has been returned since asynchronous threads have no way of communicating between one another. 

In the first approach I outlined, we are essentially giving the user some more flexibility in deciding how to resolve the latency and failure handling problem. The second approach takes some load off the client's back in that we figure out how to process the records using multiple threads, and clients doesn't have to worry about anything complex. Note that with the second plan, no CompletableFutures would be involved as secondary threads would be processing it directly using blocking methods (like KafkaConsumer#commitSync). 

We could set the first approach as the default policy, while the second approach (which is the one that Samza uses) would be used only if the client specifies us to use more than one thread per stream task. However, the second policy could potentially have some tradeoffs in that ordering would no longer guaranteed (like offset numbers are out of order).  

Proposed Changes

Changes to Processor API TBD.

Alternatives

Ordering could possibily still be guaranteed. So for ordering to happen, we will have to wait for a thread to finish processing its assigned segment of records first before any other records from other threads are returned. We are still processing using multiple threads at the same time but in different offset ranges. Let' s illustrate an example. For simplicity purposes, let parentThread process offsets 1 - 50, and childThread process offsets 51 - 100

  1. For us to guarantee ordering, childThread's records cannot be returned until parentThread has finished processing theirs. So until parentThread finishes processing offsets 1-50, the results for childThread will have to be stored temporarily. 
  2. Each time a call for more metadata is made, childThread's results will be sent to a PriorityQueue monitored by parentThread via callback. But they will not be sent to the client until all offsets before it has been sent to the client.
  3. At some point, offsets 1-50 has been returned. In this case, we will return childThread's records as well if another metadataForStore method call was made. So basically, we are delaying the sending of records processed by childThread until the offsets before it has been received by the user.

This would work, but there might be some difficulties in implementing it.

...

draw.io Diagram
bordertrue
viewerToolbartrue
fitWindowfalse
diagramNameProcessor API (with asynchronous processing)
simpleViewerfalse
width
diagramWidth641
revision7

This would be a flow diagram of how processing records would work if there is multiple threads processing records at once asynchronously. For ordering to be guaranteed, here is what will happen:

  1. A Map with (Key, Value) = (TopicPartition, AtomicInteger) will be created. Any TopicPartition, when first inserted into the map, will have its AtomicInteger value set to zero. This variable will be used to denote the current offset which we have most recently committed.
  2. An offset for a particular TopicPartition will not be committed unless the AtomicInteger value indicates it is eligible for a commit operation. Once the commit operation is complete, the AtomicInteger value will be incremented for that particular partition to tell other StreamThreads that the next record could now be sent to the user.
  3. This AtomicInteger reference will be shared across all StreamThreads. So that each time an AtomicInteger value is updated, other threads will be notified of this update.

In this manner, we will be able to commit records in sequence. 

Proposed Changes

A new config will be added (like num.threads.per.task) which will define how many threads will be used per task. By default, it will be set to one, which means that the current behavior of Processor API will be replicated. However, if it is more than one, two or more StreamThreads, for instance, will be processing at the same time from a task. Ordering is guaranteed, therefore, the user does not have to deal with inexplicable out-of-order issues. There will be an added benefit of faster processing and commit speeds for that particular task.

Note: if the total number of threads is not divisible by the num.threads.per.task then a task will probably have the remainder of the threads assigned to it after division. An IllegalArgumentException will be thrown if num.stream.threads < num.threads.per.task. 

Compatibility, Deprecation, and Migration Plan

...

Compatibility, Deprecation, and Migration Plan

There might be some problems if a new KafkaStreams instance is brought online in which it gives the offsets out of order when the user is expecting it to be in-order. So in order to maintain the old behavior, we will keep the current structure of Kafka Streams methods intact (although some of its statements might have to be tweaked to accommodate for the new change).

There are no considerations for deprecation, particularly since we are adding a new capability, not upgrading a preexisting one. However, if the user calls for records expecting it to be in-order and it's not. Then an exception will be thrown notifying the client that the configs are incompatible with their method request.

Impacts and Steps for Implementation

There are a series of ramifications that would result from this KIP that we would need to take into account. For starters, the metrics for KafkaStreams will need to be updated such that it could output the states of multiple threads if they are working in tandem on the same Kafka Streams application (but this will come later once we have laid the groundwork for the other methods). 

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other wayN/A at the moment.