You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state: Under Discussion

Discussion thread:

JIRA:

  • Unable to render Jira issues macro, execution error.
  • Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, there is no possibility in Kafka Streams to increase or decrease the number of stream threads after the Kafka Streams client has been started. There are at least two situations where such functionality would be useful:

  1. Reacting on an error that killed a stream thread.
  2. Adapt the number of stream threads to the current workload without the need to stop and start the Kafka Streams client.

Uncaught exceptions thrown in a stream thread kill the stream thread leaving the Kafka Streams client with less stream threads for processing than when the client was started. The only way to replace the killed stream thread is to restart the whole Kafka Streams client. For transient errors, it might make sense to replace a killed stream thread with a new one while users try to find the root cause of the error. That could be accomplished by starting a new stream thread in the uncaught exception handler of the killed stream thread.

When the workload of a Kafka Streams client increases, it might be beneficial to scale up the Kafka Streams client by increasing the number of stream threads that the client is currently running. On the other hand, too many stream threads might also negatively impact performance, so that decreasing the number of running stream threads could also be beneficial to a Kafka Streams application. Having the possibility to increase and decrease the number of stream threads without stopping and starting a Kafka Streams client would allow to adapt a client to its environment on the fly.  

Another functionality of a Kafka Streams client that would improve the handling of error conditions, is closing a Kafka Streams client in the uncaught exception handler. For example, when an error occurs that depends on the machine on which the Kafka Streams client is running, e.g., the node is running out of disk space or the disk is corrupted. Currently, it is not possible to close a Kafka Streams client in the uncaught exception handler because of a deadlock. The deadlock occurs because the Kafka Streams client waits for the termination of the stream thread that initiated the closing of the client and the stream thread that initiated the closing of the client waits for the completion of the closing.

In this KIP, we propose to extend the API of the Kafka Streams client to start and shutdown stream threads as well as reliably shutdown the Kafka Streams client, also from the uncaught exception handler. 

Public Interfaces


package org.apache.kafka.streams;

public class KafkaStreams implements AutoCloseable {

	/**
	 * Starts a stream thread in addition to the stream threads that are already running.
     * 
     * Since the number of stream threads increases, the sizes of the caches in the new stream thread 
     * and the existing stream threads are adapted so that the sum of the cache sizes over all stream 
     * threads does not exceed the total cache size specified in configuration 
     * {@code cache.max.bytes.buffering}. 
	 */
	public void startStreamThread();

	/**
	 * Remove a stream thread from the running stream threads.
     * 
     * Which stream thread is removed is not guaranteed.
     *
     * Since the number of stream threads decreases, the sizes of the caches in the remaining stream 
     * threads are adapted so that the sum of the cache sizes over all stream threads equals the total 
     * cache size specified in configuration {@code cache.max.bytes.buffering}. 
	 */
	public void removeStreamThread();

	/**
     * Request the shutdown of this Kafka Streams client.
     *
     * This method requests a shutdown that is performed before the next state change of the Kafka Streams client.
     * That means, that the method does not block until all stream threads are shut down and the Kafka Streams 
     * client is closed. 
     */
	public void requestClose();
}


Proposed Changes

We propose that a stream thread should throw a MissingSourceTopicException when it receives error INCOMPLETE_SOURCE_TOPIC_METADATA encoded in its assignment instead of gracefully shutting down itself. This exception will not be caught anywhere and will eventually reach the uncaught exception handler, if one is set. When all stream threads of a Streams client are dead, the Streams clients will be in state ERROR. The transition to state ERROR after the death of all stream threads corresponds to the current behavior and will not be changed by this KIP.  

Compatibility, Deprecation, and Migration Plan

The proposed changes do not need any deprecation or migration plan.

The only difference to the current behavior is that the uncaught exception handler is called. If the uncaught exception handler is not set or it is set but does not handle StreamsExceptions or its ancestors, the behavior does not change. If the uncaught exception handler is set and handles StreamsExceptions or one of its ancestors, that code will be executed when a source topic is deleted which it is currently not the case. However, users do not need to change anything to run their Streams app. They will merely get an additional error notification for a case that actually is an error.

Rejected Alternatives

none

  • No labels