You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

Current state"Under Discussion"

Discussion thread: TODO

JIRA Unable to render Jira issues macro, execution error.

Released: target 2.6

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Today, Kafka Streams relies mainly on its internal clients (consumer/producer/admin) to handle timeout exceptions and retries (the "global thread" is the only exception). However, this approach has many disadvantages. (1) It's harder for users to configure and reason about the behavior and (2) if a client retries internally, all other tasks of the same StreamThread  are blocked. Furthermore, only the producer (and only for its `send()` call, ie, for async send request by the producer's background thread) and the admin client have an explicit retries config – for the consumer, and all other producer methods retries are based on request.timeout.ms  as well as default.api.timeout.ms (or a corresponding timeout parameter; note that Kafka Streams does not specify specific timeouts and always relies on default.api.timeout.ms config).

To make Kafka Streams more robust, we propose to catch all TimeoutExceptions in Kafka Streams and handle them more gracefully.

Public Interfaces

There is no public interface change, because Kafka Streams already has a retires configuration parameter. However, retries is currently only used by the "global thread". This KIP proposes to reuse the existing config to also handle and retry timeout exceptions, hence, it is a semantic change.

Furthermore, we propose to change the default value of retries from currently 1 to 5. The existing config retries.backoff.ms has a default value of 100ms and we can just keep it as-is.

Proposed Changes

We propose to catch all TimeoutException in Kafka Streams instead of treating them as fatal, and thus to not rely on the consumer/producer/admin client to handle all such errors. To make sure that timeout issues can be reported eventually, we use the existing retries config to allow user to stop processing at some point if timeout errors persist and no progress can be made.

In particular to propose to apply the retries config on a per-task level, i.e., each task gets its own retries counter. Each time a TimeoutException occurs for a task, the counter is increased, and we move to the next task for processing. The task would be retried in the next processing loop if retry.backoff.ms passed. If the counter of a task exceed the number of configured retries, we stop processing all tasks by throwing an exception.

Compatibility, Deprecation, and Migration Plan

  • The retries  config is set to Integer.MAX_VALUE  by default on the producer and admin client and is set to one by default in Kafka Streams. There are no backward compatibility concerns: If retries are not set by the user, only the Stream default value would change (what is intended) and the producer default would be kept. If a user does set retries explicitly (implying that the producer retries are reduced), the behavior does not change as we apply the user config to the prouder as well as Kafka Streams.
    • Note that setting retires on the producer is actually discouraged; hence, if a user want to make Kafka Streams more robust and would increase retries config, we might need to educate users to also specify producer.retries=Integer.MAX_VALUE to ensure the producer retries are not reduced (if user want the producer to fail earlier, the deliver.timeout.ms config should be used).
    • Similar for the admin client that uses Integer.MAX_VALUE  a default retries  config, admin.retries should be used for this case.
    • Note, that setting producer.retries  and admin.retries would be required even today if a user overwrites the existing retries config (hence, there no increased complexity)
  • Kafka Streams set the default of retry.backoff.ms  to 100ms and we just keep this default value.

Test Plan

Regular unit and integration tests are sufficient. Existing system tests should provide good coverage implicitly.

Rejected Alternatives

  • It would be possible to apply retries on a per method level (ie, for each client method that is called, an individual retry counter is maintained). This proposal is rejected because it seems to be too fine grained and hard to reason about for users.
  • If would be possible to apply retries at the thread level, ie, whenever the thread does not make any progress in one task-processing-loop (ie, all tasks throw a timeout exception within the loop), the per-thread retry counter would be increased. This proposal is rejected as too coarse grained. In particular, a single task could get stuck while other tasks make progress and this case would not be detected.
  • To distinguish between retries within Kafka Streams and client retries (in particular the producer's send retries config), we could add a new config (eg, `task.retries`). However, keeping the number of config small is desirable and the gain of the new config seems limited.
  • To avoid that people need to consider setting producer.retries and admin.retires explicitly, we could change the behavior of Kafka Streams and use retries expliclity for Streams level retries. For this case, setting retries would not affect the producer or admin client and both retries could only be change with ther corresponding client-prefix config. This would be a backward incompatible change and in fact, it might be better moving forward to deprecate the producer and admin client retries config in favor of their newer timeout configs (this was already pointed out in KIP-533).
  • Instead of using a retry counter, it would be possible to use a `task.progress.timeout.ms` config (this might align with the recent API changes of the underlying clients). If was rejected as we have already a retry config that we can simply reuse.
  • Instead of applying retry.backoff.ms config a task would be retries in the next processing loop directly. In contrast to a "busy wait" retry as done in the clients and on the global thread, looping over all other tasks implies some retry delay natively. However, it seems to align better to existing behavior/semantics to apply retry.backoff.ms config (note, that this backoff time might actually be exceeded naturally as looping through all the other tasks might take longer).


  • No labels