You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Status

Current state: Under Discussion

Discussion thread:

JIRA:

  • Unable to render Jira issues macro, execution error.
  • Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, there is no possibility in Kafka Streams to increase or decrease the number of stream threads after the Kafka Streams client has been started. There are at least two situations where such functionality would be useful:

  1. Reacting on an error that killed a stream thread.
  2. Adapt the number of stream threads to the current workload without the need to stop and start the Kafka Streams client.

Uncaught exceptions thrown in a stream thread kill the stream thread leaving the Kafka Streams client with less stream threads for processing than when the client was started. The only way to replace the killed stream thread is to restart the whole Kafka Streams client. For transient errors, it might make sense to replace a killed stream thread with a new one while users try to find the root cause of the error. That could be accomplished by starting a new stream thread in the uncaught exception handler of the killed stream thread.

When the workload of a Kafka Streams client increases, it might be beneficial to scale up the Kafka Streams client by increasing the number of stream threads that the client is currently running. On the other hand, too many stream threads might also negatively impact performance, so that decreasing the number of running stream threads could also be beneficial to a Kafka Streams application. Having the possibility to increase and decrease the number of stream threads without stopping and starting a Kafka Streams client would allow to adapt a client to its environment on the fly.  

Another functionality of a Kafka Streams client that would improve the handling of error conditions, is closing a Kafka Streams client in the uncaught exception handler. For example, when an error occurs that depends on the machine on which the Kafka Streams client is running, e.g., the node is running out of disk space or the disk is corrupted. Currently, it is not possible to close a Kafka Streams client in the uncaught exception handler because of a deadlock. The deadlock occurs because the Kafka Streams client waits for the termination of the stream thread that initiated the closing of the client and the stream thread that initiated the closing of the client waits for the completion of the closing.

In this KIP, we propose to extend the API of the Kafka Streams client to start and shutdown stream threads as well as reliably shutdown the Kafka Streams client, also from the uncaught exception handler. 

Public Interfaces


package org.apache.kafka.streams;

public class KafkaStreams implements AutoCloseable {

	/**
	 * Starts a stream thread in addition to the stream threads that are already running in this 
     * Kafka Streams client.
     * 
     * Since the number of stream threads increases, the sizes of the caches in the new stream thread 
     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream 
     * threads does not exceed the total cache size specified in configuration 
     * {@code cache.max.bytes.buffering}. 
	 */
	public void startStreamThread();

	/**
	 * Shuts down one stream thread out of the running stream threads of this Kafka Streams client.
     * 
     * This method does not specify which stream thread is shut down.
     *
     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream 
     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total 
     * cache size specified in configuration {@code cache.max.bytes.buffering}. 
	 */
	public void shutdownStreamThread();

	/**
     * Request the shutdown of this Kafka Streams client.
     *
     * This method requests a shutdown, requests a rebalance, and returns.
     * That means, that the method does not block until all stream threads are shut down and the Kafka Streams 
     * client is closed. 
     */
	public void requestClose();
}


Proposed Changes

We propose to add the above methods to the KafkaStreams class. The behavior of those methods is as follows.

When a Kafka Streams client is started by calling KafkaStreams#start(), as many stream threads as specified in configuration num.stream.threads will be started. This corresponds to the current behavior.

When KafkaStreams#startStreamThread() is called, a new full-fledged stream thread will be started in addition to the stream threads started by KafkaStreams#start(). The new stream thread will use the same configuration as the existing stream threads. The sum of the cache sizes over the stream thread of a Kafka Streams client specified in configuration cache.max.bytes.buffering will be redistributed over the new and the existing stream threads, i.e., the cache of each stream thread will be resized after the next rebalance. Starting a new stream thread will trigger a rebalance. Once the new stream thread has been assigned tasks, it will start to execute them.

When KafkaStreams#shutDownStreamThread() is called, a running stream thread in the Kafka Streams client is shut down. It is not specified which stream thread is shut down. The chosen stream thread will stop executing tasks and close all its resources. The sum of the cache sizes over the stream thread of a Kafka Streams client specified in configuration cache.max.bytes.buffering will be redistributed over the remaining stream threads, i.e., the cache of each remaining stream thread will be resized after the next rebalance. Shutting down a stream thread will trigger a rebalance (also if static membership is configured).

When KafkaStreams#requestClose() is called, the Kafka Streams client will not close immediately but will record the request to close, request a rebalance, and return. Before joining the group the Kafka Streams client will follow the request to close and shut down itself instead of joining the group. During the shut down the Kafka Streams client will stop all of its threads, i.e., stream threads and other threads, and clean up resources. After the shut down the Kafka Streams client will be in state NOT_RUNNING. With such an asynchronous close we will avoid deadlocks when this method is called from a uncaught exception handler of a stream thread.

Compatibility, Deprecation, and Migration Plan

This proposal only adds new methods and does not change any existing methods. Thus, backward compatibility is ensured and no deprecations and migration plans are needed.

Rejected Alternatives

none

  • No labels