Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
linenumberstrue
package org.apache.kafka.streams;

public class KafkaStreams implements AutoCloseable {

	/**
	 * Adds and starts a stream thread in addition to the stream threads that are already running in this 
     * Kafka Streams client.
     * 
     * Since the number of stream threads increases, the sizes of the caches in the new stream thread 
     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream 
     * threads does not exceed the total cache size specified in configuration 
     * {@code cache.max.bytes.buffering}.
     *
     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
     *
     * @return name of the added stream thread or empty if a new stream thread could not be added 
	 */
	public Optional<String> addStreamThread();

	/**
	 * Removes one stream thread out of the running stream threads from this Kafka Streams client.
     * 
     * The removed stream thread is gracefully shut down. This method does not specify which stream 
     * thread is shut down.
     *
     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream 
     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total 
     * cache size specified in configuration {@code cache.max.bytes.buffering}. 
     *
     * @return name of the removed stream thread or empty if a stream thread could not be removed because 
     *         no stream threads are alive
	 */
	public Optional<String> removeStreamThread();

	/**
	 * Removes one stream thread out of the running stream threads from this Kafka Streams client.
     * 
     * The removed stream thread is gracefully shut down. This method does not specify which stream 
     * thread is shut down.
     *
     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream 
     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total 
     * cache size specified in configuration {@code cache.max.bytes.buffering}. 
     *
     * If the given timeout is exceeded, the method will throw a {@code TimeoutException}. 
     *
     * @return name of the removed stream thread or empty if a stream thread could not be removed because 
     *         no stream threads are alive
	 */
    public Optional<String> removeStreamThread(final Duration timeout);

}


Proposed Changes

We propose to add the above methods to the KafkaStreams class. The behavior of those methods is described in this section alongside other behavioral changes we propose.

...

When KafkaStreams#removeStreamThread() is called, a running stream thread in the Kafka Streams client is shut down. It is not specified which stream thread is shut down. The chosen stream thread will stop executing tasks and close all its resources. The sum of the cache sizes over the stream thread of a Kafka Streams client specified in configuration cache.max.bytes.buffering will be redistributed over the remaining stream threads, i.e., the cache of each remaining stream thread will be resized after the next rebalance. Shutting down a stream thread will trigger a rebalance (also if static membership is configured). If the last running stream thread is shut down with KafkaStreams#removeStreamThread(), the Kafka Streams client will stay in state RUNNING. If a new stream thread is added via KafkaStreams#addStreamThread(), the client will transit to state REBALANCING and then RUNNING when it will restart processing input records. Method KafkaStreams#removeStreamThread() will block until the shut down of the stream thread completed and it will return the name of the shut down stream thread. If no stream thread could be removed because no alive stream threads exist for the Kafka Streams client, it will return earlier with an empty optional. If KafkaStreams#removeStreamThread(final Duration) exceeds the given timeout, it will throw a TimeoutException. 

Stream threads that are in state DEAD will be removed from the set of stream threads of a Kafka Streams client to avoid unbounded increase of the number of stream threads kept in a client. Dead stream threads will be removed independently from whether they were started during the start of the Kafka Streams client or through a call to KafkaStreams#addStreamThread(). KafkaStreams#localThreadsMetadata() will not return metadata of stream threads that are in state DEAD. As currently, the Kafka Streams client will transit to ERROR if the last alive stream thread dies exceptionally.

...