Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently, in Kafka Streams, a single thread is used to process tasks which could result in a performance bottleneck. With the current model, under the condition of a fatal error, a new retry topic will be created in which failed records will be sent. Incoming records could not be processed until the failed records in the retry topic are fully reprocessed to guarantee ordering. This could result in a lag because we are essentially backtracking to make sure all data has been sent. New data that would have been processed if a failure had not occurred will have to wait while the thread paused on previous records.

Present Kafka Design Philosophy

Before we dive into how we could change the preexisting codebase, it would be helpful to clarify on some of the details regarding the current Kafka Streams design. Right now, threads are independent of one another–that is, each thread is in charge of its own stream task. There is no cooperation between threads. And as a result, they are not processing the same topic, but instead distinct ones. For example, one thread processes topic1 while another processes topic2. These two threads does not attempt to access the other topic because such knowledge is not given to them. In this manner, the two threads are isolated from one another. 

However, this design could be problematic under certain conditions. When failed records are moved to a retry topic, the current thread has no way of passing it off to another thread and say "here, you could do this work for me so it doesn't hold up the line." So we have some options to fix this issue:

  1. Allow the current thread access to other threads on standby. And when a need for increased processing speed is necessary, we could call upon these reserve threads and then release them once their task is complete so that they could be used elsewhere.
  2. Another option instead is to maintain the isolation of the threads, and use a method that does not block (i.e. send a request with a callback which is only called if the records still fail – it would act like a feedback loop and send a request again).

Public Interfaces

We have a couple of choices available to us as mentioned in the ticket discussion (KAFKA-6989). Recall that in asynchronous methods, we do not need inputs from a previous function to start it. All we need is the signal. However, when offsets have been committed or processed in an asynchronized manner, we will need to determine the behavior that occurs after the conclusion of the process. It would also be wise to remember that asynchronous processing does not necessarily require an extra thread.  When offsets are committed or received, we should consider the following:

  1. Ordering: As noted in the JIRA chat, Samza has judged it to be impossible for the records to be returned in its original sequence by their implementation (if there is more than one thread that is consuming data). A sort of pseudo-ordering could be guaranteed if we sort the records that we return by time of arrival and offset number, but that might not matter considering that the record's offsets are not consecutive
  2. Exactly-Once: In exactly-once semantics, each offsets are returned once. This will not be possible if multiple threads are active (i.e. more than one thread is calling the same Kafka Streams instance).
  3. Latency and Resilience: Whenever we attempt to retry processing, as mentioned in the JIRA ticket, it could end up as a performance bottleneck because we are effectively "stopping the world" by pausing on a single record. An option to avoid this is to allow a second thread to handle these failed records while we continue to process incoming metadata. However, exactly once and ordering would not be impossible to guarantee supported under these conditions.

Now we have two ways to approach this issue. Note that in KafkaConsumer, its implementation basically prevents more than one thread from calling it at once. So if we wish to follow a similar methodology with Kafka Streams, we will have a couple of tradeoffs:

...

We could set the first approach as the default policy, while the second approach would be used only if the client specifies us to use more than one thread. 

Proposed Changes

...

For both the first and second plan, we will have to add some capability to process asynchronously (even if you are using only one thread). So we would include a method such as the following:

Code Block
languagejava
themeEclipse
titlegetMetadataAsync
collapsetrue
/**
 * 
 * /
T <T> getMetadtaAsync();



Compatibility, Deprecation, and Migration Plan

...