Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

draw.io Diagram
bordertrue
viewerToolbartrue
fitWindowfalse
diagramNameCurrent Flow of Processor API
simpleViewerfalse
widthdiagramWidth781
revision23

Above, we could see a simplified diagram of how KafkaStreams implements the Processor API. It should be noted that one StreamThread can process records from multiple StreamTasks at once. But this is not applicable in reverse. A StreamTask could not be sending records to multiple StreamThreads.  This is a major bottleneck and we would need to work to fix this.

...

draw.io Diagram
bordertrue
viewerToolbartrue
fitWindowfalse
diagramNameProcessor API (with asynchronous processing)
simpleViewerfalse
width
diagramWidth641
revision17

This would be a flow diagram of how processing records would work if there is multiple threads processing records at once asynchronously. However, ordering would not be guaranteed.

Proposed Changes

Changes to Processor API TBD.

Compatibility, Deprecation, and Migration Plan

There might be some problems if a new KafkaStreams instance is brought online in which it gives the offsets out of order when the user is expecting it to be in-order. So in order to maintain the old behavior, we will keep the current structure of Kafka Streams methods intact (although some of its statements might have to be tweaked to accommodate for the new change).

For ordering to be guaranteed, here is what will happen:

  1. A Map with (Key, Value) = (TopicPartition, AtomicInteger) will be created. Any TopicPartition, when first inserted into the map, will have its AtomicInteger value set to zero. This variable will be used to denote the current offset which we have most recently committed.
  2. An offset for a particular TopicPartition will not be committed unless the AtomicInteger value indicates it is eligible for a commit operation. Once the commit operation is complete, the AtomicInteger value will be incremented for that particular partition to tell other StreamThreads that the next record could now be sent to the user.
  3. This AtomicInteger reference will be shared across all StreamThreads. So that each time an AtomicInteger value is updated, other threads will be notified of this update.

In this manner, we will be able to commit records in sequence. 

Proposed Changes

A new config will be added (like num.threads.per.task) which will define how many threads will be used per task. By default, it will be set to one, which means that the current behavior of Processor API will be replicated. However, if it is more than one, two or more StreamThreads, for instance, will be processing at the same time from a task. Ordering is guaranteed, therefore, the user does not have to deal with inexplicable out-of-order issues. There will be an added benefit of faster processing and commit speeds for that particular task.

Note: if the total number of threads is not divisible by the num.threads.per.task then a task will probably have the remainder of the threads assigned to it after division. An IllegalArgumentException will be thrown if num.stream.threads < num.threads.per.task. 

Compatibility, Deprecation, and Migration Plan

There are no considerations for deprecation, particularly since we are adding a new capability, not upgrading a preexisting one. However, if the user calls for records expecting it to be in-order and it's not. Then an exception will be thrown notifying the client that the configs are incompatible with their method request.

Impacts and Steps for Implementation

There are a series of ramifications that would result from this KIP that we would need to take into account. For starters, the metrics for KafkaStreams will need to be updated such that it could output the states of multiple threads if they are working in tandem on the same Kafka Streams application (but this will come later once we have laid the groundwork for the other methods). 

Alternatives

Rejected Alternatives

N/A at the moment.There has been some thoughts on whether or not ordering is supported or not with this added feature.