Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

draw.io Diagram
bordertrue
viewerToolbartrue
fitWindowfalse
diagramNameCurrent Flow of Processor API
simpleViewerfalse
widthdiagramWidth781
revision3

Above, we could see a simplified diagram of how KafkaStreams implements the Processor API. It should be noted that one StreamThread can process records from multiple StreamTasks at once. But this is not applicable in reverse. A StreamTask could not be sending records to multiple StreamThreads.  This is a major bottleneck and we would need to work to fix this.

...

draw.io Diagram
bordertrue
viewerToolbartrue
fitWindowfalse
diagramNameProcessor API (with asynchronous processing)
simpleViewerfalse
width
diagramWidth641
revision27

This would be a flow diagram of how processing records would work if there is multiple threads processing records at once asynchronously. For ordering to be guaranteed, here is what will happen:

...

In this manner, we will be able to commit records in sequence. 

Proposed Changes

Changes to Processor API TBD.

Compatibility, Deprecation, and Migration Plan

There might be some problems if a new KafkaStreams instance is brought online in which it gives the offsets out of order when the user is expecting it to be in-order. So in order to maintain the old behavior, we will keep the current structure of Kafka Streams methods intact (although some of its statements might have to be tweaked to accommodate for the new change).

A new config will be added (like num.threads.per.task) which will define how many threads will be used per task. By default, it will be set to one, which means that the current behavior of Processor API will be replicated. However, if it is more than one, two or more StreamThreads, for instance, will be processing at the same time from a task. Ordering is guaranteed, therefore, the user does not have to deal with inexplicable out-of-order issues. There will be an added benefit of faster processing and commit speeds for that particular task.

Note: if the total number of threads is not divisible by the num.threads.per.task then a task will probably have the remainder of the threads assigned to it after division. An IllegalArgumentException will be thrown if num.stream.threads < num.threads.per.task. 

Compatibility, Deprecation, and Migration Plan

There are no considerations for deprecation, particularly since we are adding a new capability, not upgrading a preexisting one. However, if the user calls for records expecting it to be in-order and it's not. Then an exception will be thrown notifying the client that the configs are incompatible with their method request.

Impacts and Steps for Implementation

There are a series of ramifications that would result from this KIP that we would need to take into account. For starters, the metrics for KafkaStreams will need to be updated such that it could output the states of multiple threads if they are working in tandem on the same Kafka Streams application (but this will come later once we have laid the groundwork for the other methods). 

...