Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, there is no possibility in Kafka Streams to increase or decrease the number of stream threads after the Kafka Streams client has been started. There are at least two situations where such functionality would be useful:

...

In this KIP, we propose to extend the API of the Kafka Streams client to start and shutdown stream threads as well as reliably shutdown the Kafka Streams client, also from the uncaught exception handler. 

Public Interfaces


Code Block
languagejava
package org.apache.kafka.streams;

public class KafkaStreams implements AutoCloseable {

	/**
	 * Adds and starts a stream thread in addition to the stream threads that are already running in this 
     * Kafka Streams client.
     * 
     * Since the number of stream threads increases, the sizes of the caches in the new stream thread 
     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream 
     * threads does not exceed the total cache size specified in configuration 
     * {@code cache.max.bytes.buffering}.
     *
     * @return true if a new stream thread was added, false otherwise 
	 */
	public boolean addStreamThread();

	/**
	 * Removes one stream thread out of the running stream threads from this Kafka Streams client.
     * 
     * The removed stream thread is gracefully shut down. This method does not specify which stream 
     * thread is shut down.
     *
     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream 
     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total 
     * cache size specified in configuration {@code cache.max.bytes.buffering}. 
	 */
	public void removeStreamThread();
}


Proposed Changes

We propose to add the above methods to the KafkaStreams class. The behavior of those methods is described in this section alongside other behavioral changes we propose.

...

To avoid the deadlock when KafkaStreams#close() is called from an uncaught exception handler, we propose to check whether the calling thread is a stream thread and if it is to skip its joining. Skipping the joining of a stream thread that is its uncaught exception handler is not an issue, because at that point all the resources of the stream thread are closed, the stream thread is in state DEAD and as soon as it exits its uncaught exception handler it will be terminated. KafkaStreams#close() can only be called by a stream thread from the uncaught exception handler.

Compatibility, Deprecation, and Migration Plan

The proposal is backward-compatible because it only adds new methods and does not change any existing methods. The behavioral changes regarding the transitions between states of a Kafka Streams client are backward compatible, because the new transition between state ERROR and state REBALANCING can only be triggered by one of the methods proposed in this KIP. The only proposed change that slightly changes the current behavior is to remove removal of stream threads in state DEAD from the Kafka Streams client, hence they those stream threads might not be contained in the results of KafkaStreams#localThreadsMetadata(). We regard this change as minor and not relevant to operational continuity.

No methods need to be deprecated and no migration plan is required.

Rejected Alternatives

none