Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Today, Kafka Streams relies mainly on its internal clients (consumer/producer/admin) to handle timeout exceptions and retries (the "global thread" is the only exception). However, this approach has many disadvantages. (1) It is harder for users to configure and reason about the behavior and (2) if a client retries internally, all other tasks of the same StreamThread are blocked. Furthermore, the Kafka Streams retries config has a default value of 0 and is only used in the global thread while producer and admin client default retires is Integer.MAX_VALUE. If  (note that the embedded clients in Kafka Streams also use MAX_VALUE as default; the default value of retries=0 only applies to the global thread). This implies that if users set Kafka Streams retries they may accidentally reduce the producer and admin client retry config.

To make Kafka Streams more robust, we propose to catch all client TimeoutExceptions in Kafka Streams and handle them more gracefully. Furthermore, reasoning about time is simpler for users then reasoning about number of retries. Hence, we propose to base all configs on timeouts and to deprecate all configs that rely on number of retries: this included includes producer and admin client.

...

Producer and admin client use a default retries config value of Integer.MAX_VALUE and already rely on time based timeouts be default already (cf KIP-91 and KIP-533). They would still respected the deprecated retries config but log a warning if used. For Kafka Streams the retires config would be ignored (we only keep it to not break code that might set it) and log a warning if used. Its It default retries value in Kafka Streams is 0 and we want to have a more robust default configuration. Note that the default retries values of 0 does not apply the embedded producer or admin client. Only if the user explicitly sets retries the embedded producer and admin client configs would we changed (this KIP does not change this behavior).

Furthermore, we propose to catch all client TimeoutException in Kafka Streams instead of treating them as fatal, and thus to not rely on the consumer/producer/admin client to handle all such errors. If a TimeoutException occurs, we skip the current task and move to the next task for processing (we will also log a WARNING for this case to give people inside which client call did produce the timeout exception). The failed task would automatically be retired in the next processing loop. Because other tasks a processed until a task is retried, we don't have to worry about a busy wait situation. Even if a thread would have only a single task, the clients internal exponential retries would avoid busy waiting.

To make sure that timeout issues can be reported eventually, we use a new task.timeout.ms config to allow user to stop processing at some point if a single task cannot make any progress. The "timer" for task.timeout.ms starts when the first client TimeoutException is detected and is reset/disabled if a task processes records successfully in a retry. If task.timeout.ms passed, a final attempt will be made to make progress (this strategy ensures that a task will be retried at least once; except task.timeout.ms is set to 0, what implies zero retries); if another client TimeoutException occurs, processing is stopped by re-throwing it and the streams-thread dies. Note that the task.timeout.ms config does only apply if a TimeoutException occurred. During normal, potentially slow processing, task.timeout.ms would not be applied.

Note that some client calls are issued for multiple tasks at once (as it is more efficient to issue fewer requests to brokers). For this case, the "timer" would start ticking for all those tasks.

To replace retries in the global thread's initialization phase, we also retry TimeoutException until task.timeout.ms expires. We apply existing retry.backoff.ms config and rely on the client to do exponential backoff and retry for this case.

Last, the admin client is used within the group leader to collect topic metadata and to create internal topics if necessary. If those calls fails, they are retried within Kafka Streams re-using the admin client's  retries config. Because admin retries will be deprecated, we should not re-use it any longer for this purpose. The current retry loop is across multiple admin client calls that are issues interleaved. This interleaved retry logic should be preserved. Furthermore, we should not retry infinitely to avoid that the leader is stuck forever (even if it would be removed from the group by the group coordinator after a timeout anyway). To avoid to drop out of the consumer group, the retry loop should be stopped before we hit the timeout. We propose to use a 50% threshold.

Compatibility, Deprecation, and Migration Plan

...