Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Public Interfaces


Code Block
languagejava
linenumberstrue
package org.apache.kafka.streams;

public class KafkaStreams implements AutoCloseable {

	/**
	 * Adds and starts a stream thread in addition to the stream threads that are already running in this 
     * Kafka Streams client.
     * 
     * Since the number of stream threads increases, the sizes of the caches in the new stream thread 
     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream 
     * threads does not exceed the total cache size specified in configuration 
     * {@code cache.max.bytes.buffering}.
     *
     * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING.
     *
     * @return name of the added stream thread or empty if a new stream thread could not be added 
	 */
	public Optional<String> addStreamThread();

	/**
	 * Removes one stream thread out of the running stream threads from this Kafka Streams client.
     * 
     * The removed stream thread is gracefully shut down. This method does not specify which stream 
     * thread is shut down.
     *
     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream 
     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total 
     * cache size specified in configuration {@code cache.max.bytes.buffering}. 
     *
     * @return name of the removed stream thread or empty if a stream thread could not be removed because 
     *         no stream threads are alive
	 */
	public Optional<String> removeStreamThread();


	/**
     * Returns the current number of stream thread that are alive, i.e., stream threads that are not in state DEAD.
     *
     * @return current number of stream thread that are alive 
     */
	public int numberOfAliveStreamThreads();
}


Proposed Changes

We propose to add the above methods to the KafkaStreams class. The behavior of those methods is described in this section alongside other behavioral changes we propose.

...

Stream threads that are in state DEAD will be removed from the stream threads of a Kafka Streams client. This is true for all stream threads, for the ones started when the Kafka Streams client is started as well as for the ones that are started through a call to KafkaStreams#addStreamThread(). This implies that a call to KafkaStreams#localThreadsMetadata() that currently returns the stream threads in state DEAD might not return them anymore depending on the relative timing of the removal of the stream thread and the call to KafkaStreams#localThreadsMetadata(). More specifically, dead stream threads are only returned by KafkaStreams#localThreadsMetadata() until they are removed from the Kafka Streams client. To monitor the number of stream threads that died exceptionally, i.e., failed, in the course of time, we propose to add the following client-level metric:

...

The number of stream threads is not persisted across restarts. That means that a client will always start as many stream threads as specified in configuration num.stream.threads during start-up. Even though KafkaStreams#addStreamThread() and KafkaStreams#removeStreamThread() have been called since the last start of the client.

Examples of Calling the API from an Uncaught Exception Handler

Code Block
languagejava
linenumberstrue
kafkaStreams.setUncaughtExceptionHandler((thread, exception) -> {
    if (exception instanceof ProcessorStateException) {
		while (kafkaStreams.isRunningOrRebalancing()) {
			try {
		        final Optional<ThreadMetadata> threadMetadata = kafkaStreams.localThreadsMetadata().stream()
	    	        .filter(metadata -> metadata.threadName().equals(thread.getName())).findAny();
	  		    if (threadMetadata.isPresent()) {
	            	log.debug(threadMetadata.get().toString());
	        	} else {
	            	log.error("No stream thread metadata found!");
	        	}
				break;
			} catch (final IllegalStateException illegalStateException) {
				log.error("Stream thread metadata could not be retrieved", illegalStateException);
			}
		}
		final Optional<String> nameOfAddedStreamThread = Optional.empty();
		do {
        	nameOfAddedStreamThread = kafkaStreams.addStreamThread();
        } while (!nameOfAddedStreamThread.isPresent() && (kafkaStreams.state() == CREATE || kafkaStreams.isRunningOrRebalancing()))
		log.debug("New stream thread named {} was added", nameOfAddedStreamThread)
    } else {
		log.error("The following uncaught exception was not handled: ", exception)
	}
});


Compatibility, Deprecation, and Migration Plan

...