Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, there is no possibility in Kafka Streams to increase or decrease the number of stream threads after the Kafka Streams client has been started. There are at least two situations where such functionality would be useful:

...

In this KIP, we propose to extend the API of the Kafka Streams client to start and shutdown stream threads as well as reliably shutdown the Kafka Streams client, also from the uncaught exception handler. 

Public Interfaces


Code Block
languagejava
package org.apache.kafka.streams;

public class KafkaStreams implements AutoCloseable {

	/**
	 * Starts a stream thread in addition to the stream threads that are already running in this 
     * Kafka Streams client.
     * 
     * Since the number of stream threads increases, the sizes of the caches in the new stream thread 
     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream 
     * threads does not exceed the total cache size specified in configuration 
     * {@code cache.max.bytes.buffering}. 
	 */
	public void startStreamThread();

	/**
	 * Shuts down one stream thread out of the running stream threads of this Kafka Streams client.
     * 
     * This method does not specify which stream thread is shut down.
     *
     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream 
     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total 
     * cache size specified in configuration {@code cache.max.bytes.buffering}. 
	 */
	public void shutdownStreamThread();

	/**
     * Request the shutdown of this Kafka Streams client.
     *
     * This method requests a shutdown, requests a rebalance, and returns.
     * That means, that the method does not block until all stream threads are shut down and the Kafka Streams 
     * client is closed. 
     */
	public void requestClose();
}


Proposed Changes

We propose to add the above methods to the KafkaStreams class. The behavior of those methods is as follows.

...

When KafkaStreams#requestClose() is called, the Kafka Streams client will not close immediately but will record the request to close, request a rebalance, and return. Before joining the group the Kafka Streams client will follow the request to close and shut down itself instead of joining the group. During the shut down the Kafka Streams client will stop all of its threads, i.e., stream threads and other threads, and clean up resources. After the shut down the Kafka Streams client will be in state NOT_RUNNING. With such an asynchronous close we will avoid deadlocks when this method is called from a uncaught exception handler of a stream thread.

Compatibility, Deprecation, and Migration Plan

This proposal only adds new methods and does not change any existing methods. Thus, backward compatibility is ensured and no deprecations and migration plans are needed.

Rejected Alternatives

none