...
Today, Kafka Streams relies mainly on its internal clients (consumer/producer/admin) to handle timeout exceptions and retries (the "global thread" is the only exception). However, this approach has many disadvantages. (1) It 's is harder for users to configure and reason about the behavior and (2) if a client retries internally, all other tasks of the same StreamThread
are are blocked. Furthermore, only the producer (and only for its `send()` call, ie, for async send request by the producer's background thread) and the admin client have an explicit retries
config – for the consumer, and all other producer methods retries are based on request.timeout.ms
as well as default.api.timeout.ms
(or a corresponding timeout parameter; note that Kafka Streams does not specify specific timeouts and always relies on default.api.timeout.ms
config).the Kafka Streams retries
config has a default value of 0 and is only used in the global thread while producer and admin client default retires
is Integer.MAX_VALUE
. If users set Kafka Streams retries
they may accidentally reduce the producer and admin client retry config.
To make Kafka Streams more robust, we propose to catch all all client TimeoutException
s in Kafka Streams and handle them more gracefully. Furthermore, reasoning about time is simpler for users then reasoning about number of retries. Hence, we propose to base all configs on timeouts and to deprecate all configs that rely on number of retries: this included producer and admin client.
Public Interfaces
There is no public interface change, because Kafka Streams already has a retires
configuration parameter. However, retries
is currently only used by the "global thread". This KIP proposes to reuse the existing config to also handle and retry timeout exceptions, hence, it is a semantic changeWe propose to deprecate the retries
configuration parameter for the producer and admin client, as well as for Kafka Streams.
Furthermore, we propose to change the default value of retries
from currently 1 to 5. The existing config retries.backoff.ms
has a default value of 100ms and we can just keep it as-is.
Proposed Changes
introduce task.timeout.ms
as an upper bound for any task to make progress with a default config of 5 minutes. If a task hits a client TimeoutException, the task would be skipped and the next task is processed.
The existing retry.backoff.ms
is used as backoff time (default value 100ms) if a tight retry loop is required. We rely on client internal retry/backoff mechanism to void busy waiting (cf. KIP-580: Exponential Backoff for Kafka Clients).
Proposed Changes
Producer and admin client use a default retries
config value of Integer.MAX_VALUE
and already rely on time based timeouts be default already (cf KIP-91 and KIP-533). They would still respected the deprecated retries
config but log a warning if used. For Kafka Streams the retires
config would be ignored (we only keep it to not break code that might set it) and log a warning if used. Its default retries
value in Kafka Streams is 0 and we want to have a more robust default configuration.
Furthermore, we propose to catch all client We propose to catch all TimeoutException
in Kafka Streams instead of treating them as fatal, and thus to not rely on the consumer/producer/admin client to handle all such errors. If a TimeoutException occurs, we skip the current task and move to the next task for processing. The failed task would automatically be retired in the next processing loop. Because other tasks a processed until a task is retried, we don't have to worry about a busy wait situation. Even if a thread would have only a single task, the clients internal exponential retries would avoid busy waiting.
To make sure that timeout issues can be reported eventually, we use the existing retries
a new task.timeout.ms
config to allow user to stop processing at some point if timeout errors persist and no progress can be made.In particular to propose to apply the retries
config on a per-task level, i.e., each task gets its own retries counter. Each time a TimeoutException
occurs for a task, the counter is increased, and we move to the next task for processing. The task would be retried in the next processing loop if retry.backoff.ms
passed. If the counter of a task exceed the number of configured retries, we stop processing all tasks by throwing an exceptiona single task cannot make any progress. Note that this config does only apply if a previous TimeoutException occurred. During normal, potentially slow processing, task.timeout.ms
would not be applied. In particular, if a task hits a TimeoutException (and task.timeout.ms
is not set to 0) the task will be retried at least once.
To replace retries
in the global thread's initialization phase, we also retry TimeoutException until task.timeout.ms
expires. We apply existing retry.backoff.ms
config and rely on the client to do exponential backoff and retry for this case.
Compatibility, Deprecation, and Migration Plan
- The
retries
config is set toInteger.MAX_VALUE
by default on the producer and admin client and is set to one by default in Kafka Streams. There are no backward compatibility concerns: If retries are not set by the user, only the Stream default value would change (what is intended) and the producer default would be kept. If a user does setretries
explicitly (implying that the producer retries are reduced), the behavior does not change as we apply the user config to the prouder as well as Kafka Streams.- Note that setting retires on the producer is actually discouraged; hence, if a user want to make Kafka Streams more robust and would increase
retries
config, we might need to educate users to also specifyproducer.retries=Integer.MAX_VALUE
to ensure the producer retries are not reduced (if user want the producer to fail earlier, thedeliver.timeout.ms
config should be used). - Similar for the admin client that uses
Integer.MAX_VALUE
a defaultretries
config,admin.retries
should be used for this case. - Note, that setting
producer.retries
andadmin.retries
would be required even today if a user overwrites the existingretries
config (hence, there no increased complexity)
- Note that setting retires on the producer is actually discouraged; hence, if a user want to make Kafka Streams more robust and would increase
- Kafka Streams set the default of
retry.backoff.ms
to 100ms and we just keep this default value.
- Producer and admin client behavior does not change; both still respect
retries
config. - Kafka Streams will ignore
retries
config; however, the new default will be more robust and thus no backward compatibly concern arises. If users really want to have the old "non robust" fail immediately behavior, they can settask.timeout.ms=0
.
Test Plan
Regular unit and integration tests are sufficient. Existing system tests should provide good coverage implicitly.
Rejected Alternatives
- Reuse the existing
retries
config and handle client TimeoutException based on it. Rejected because a reasoning about time is easier for users and other client started to move away from count based retries already. - A task could be retried immediately if a client TimeoutException occurs instead of skipping it. However, this would result is "busy wait" pattern and other tasks could not make progress until the "failing" task makes progress again of eventually times out.
- It would be possible to apply
retries
on a per method level (ie, for each client method that is called, an individual retry counter is maintained). This proposal is rejected because it seems to be too fine grained and hard to reason about for users. - If would be possible to apply
retries
at the thread level, iei.e., whenever the thread does not make any progress in one task-processing-loop (ie, all tasks throw a timeout exception within the loop), the per-thread retry counter would be increased. This proposal is rejected as too coarse grained. In particular, a single task could get stuck while other tasks make progress and this case would not be detected. - If would be possible to apply
thread.timeout.ms
at the thread level instead of atask.timeout.ms
at a task level: whenever the thread does not make any progress on any tasks within the timeout, the thread would fail. This proposal is rejected as too coarse grained. In particular, a single task could get stuck while other tasks make progress and this case would not be detected. - To distinguish between retries within Kafka Streams and client retries (in particular the producer's send
retries
config), we could add a new config (eg, `task.retries`). However, keeping the number of config small is desirable and the gain of the new config seems limited. - To avoid that people need to consider setting
producer.retries
andadmin.retires
explicitly, we could change the behavior of Kafka Streams and useretries
expliclity explicitly for Streams level retries. For this case, settingretries
would not affect the producer or admin client and both retries could only be change with ther their corresponding client-prefix config. This would be a backward incompatible change and in fact, it might be better moving forward to deprecate the producer and admin clientretries
config in favor of their newer timeout configs (this was already pointed out in KIP-533). - Instead of using a retry counter, it would be possible to use a new `task.progress.timeout.ms` config (this might align with the recent API changes of the underlying clients). If was rejected as we have already a
retry
config that we can simply reuse. - Instead of applying
retry.backoff.ms
config a task would be retries in the next processing loop directly. In contrast to a "busy wait" retry as done in the clients and on the global thread, looping over all other tasks implies some retry delay natively. However, it seems to align better to existing behavior/semantics to applyretry.backoff.ms
config (note, that this backoff time might actually be exceeded naturally as looping through all the other tasks might take longer).