Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently, in Kafka Streams, a single thread maximum is used allowed to process tasks a task which could result in a performance bottleneck. With the current model, under the condition of a fatal error, a new retry topic will be created/used in which failed records will be sent. Incoming records could not be processed until the failed records in the retry topic are fully reprocessed to guarantee ordering. This could result in a lag because we are essentially backtracking to make sure all data has been sent. New data that would have been processed if a failure had not occurred will have to wait while the thread paused on previous records.

...

  1. Allow the current thread access to other threads on standby. And when a need for increased processing speed is necessary, we could call upon these reserve threads and then release them once their task is complete so that they could be used elsewhere.
  2. Another option instead is to maintain the isolation of the threads, and use a method that does not block (i.e. use a similar policy to KafkaConsumer#commitAsync).

Please note that these additions will probably affect only internals. The current methods of Kafka Streams would not be modified (like StateStore's put() or get() operations).

Currently, when a new KafkaStreams applications is initialized, the user would have a config available to them which defines the number of threads KafkaStreams will use (num-stream-threads). What will happen is that N StreamThread instances would be created where N = num-stream-threads. However, if N is greater than the number of tasks, then some threads will be held in reserve and will idle unless some other thread fails, in which case, they will be brought online. By current structure, each task will have at maximum one thread processing it.

Public Interfaces

We have a couple of choices available to us as mentioned in the ticket discussion (KAFKA-6989). Recall that in asynchronous methods, we do not need inputs from a previous function to start it. All we need is the signal. However, when offsets have been committed or processed in an asynchronized manner, we will need to determine the behavior that occurs after the conclusion of the process. It would also be wise to remember that asynchronous processing does not necessarily require an extra thread.  When offsets are committed or received, we should consider the following:

...

We could set the first approach as the default policy, while the second approach (which is the one that Samza uses) would be used only if the client specifies us to use more than one thread per stream task.  

Proposed Changes

For the first plan, we would have some new methods that might look something like this: 

Code Block
languagejava
themeEclipse
titlegetMetadataAsync
/** Retrieves record metadata for a topic asynchronously. This method does not block.
 *  @param callback will be called when the records has been retrieved or has failed.
 *  @param storeName the name of the store the user is querying.
 * /
void getMetadataAsync(RecordCallback callback, String storeName);


/** Retrieves offsets. If num.threads.per.task >= 1, then it will return offsets out of order.
 *  @param inOrder the expectation for the offsets to be in order.
 *  @param storeName name of store being queried.
 *  @throws IllegalStateException if inorder == true when num.threads.per.task > 1
 */
Collections<StreamsMetadata> processMetadataForStore(boolean inOrder, String storeName);

The purpose of RecordCallback is analogous to KafkaConsumer's OffsetCommitCallback. The user would use this interface (or class?) to define the end behavior of a method.  It could be used to notify the user that they could move on to processing the next record, or it could be used as a feedback loop to send another request for the same record (if it failed). 

...

Changes to Processor API TBD.

Alternatives

Ordering could possibily still be guaranteed. So

...

In the second plan, we will have to include a new config such as num.threads.per.task to help us ascertain the number of threads, maximum, that could be used per task. These threads would be simultaneously processing from the same task (however, ordering would no longer be guaranteed).

If num.threads.per.task is equal to one, then we go to our default policy. Otherwise, we would go with the second one. Please note that if getMetadataAsnyc was supported for the case where num.threads.per.task is greater than one, than there will be multiple calls to the user-provided callback because the request would be split into multiple asynchronous threads (who does not have any communications with one another). The callback could be seen below:

Code Block
languagejava
themeEclipse
titleRecordCallback
public interface RecordCallback {
    /** A method which can be used by the client to define the behavior of the code after the metadata has been retrieved.
	 *  @param exc      An exception if one had occurred 
     *  @param metadata The metadata retrieved (that is if successfully)
     */
	public void onComplete(KafkaException exc, Collection<StreamsMetadata> metadata);
}

Alternatives

There is a possibility where getMetadataAsync could call the given RecordCallback once even though we might be splitting the work into multiple threads. When implementing this change, the most likely approach will be having a primary thread acting as a parent for the other threads (i.e. it decides when threads are terminated and executed). The child threads will not have the original callback, but instead a modified one. These modified callbacks are used to notify the parent thread when they are done processing their respective tasks and once all threads are done processing, then we call the callback to maintain behavior. It really comes down to how it works out in the code, but it is worth consideration.

draw.io DiagrambordertrueviewerToolbartruefitWindowfalsediagramNameFlow of getMetadataAsyncsimpleViewerfalsewidthdiagramWidth611revision1On another note, ordering could still be guaranteed. It works in a similar manner to getMetadataAsync() were it to call the client-given RecordCallback once.  So for ordering to happen, we will have to wait for a thread to finish processing its assigned segment of records first before any other records from other threads are returned. We are still processing using multiple threads at the same time but in different offset ranges. Let' s illustrate an example. For simplicity purposes, let parentThread process offsets 1 - 50, and childThread process offsets 51 - 100

  1. For us to guarantee ordering, childThread's records cannot be returned until parentThread has finished processing theirs. So until parentThread finishes processing offsets 1-50, the results for childThread will have to be stored temporarily. 
  2. Each time a call for more metadata is made, childThread's results will be sent to parentThread via callback. But they will not be sent to the client until all offsets before it has been sent to the client.
  3. At some point, offsets 1-50 has been returned. In this case, we will return childThread's records as well if another metadataForStore method call was made. So basically, we are delaying the sending of records processed by childThread until the offsets before it has been received by the user.

This would work, but there might be some difficulties in implementing it.


draw.io Diagram
bordertrue
viewerToolbartrue
fitWindowfalse
diagramNameFlow of getMetadataAsync
simpleViewerfalse
width
diagramWidth661
revision3

Compatibility, Deprecation, and Migration Plan

...

However, if the user calls for records expecting it to be in-order and it's not. Then an exception will be thrown notifying the client that the configs are incompatible with their method request.For example, the original metadataForStore method will call processMetadataForStore(boolean inOrder) with inOrder having a value of true. If the configs indicated that we are processing using multiple threads, then an exception will be thrown. 

Impacts and Steps for Implementation

...