Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In order to give users the chance to handle exceptions we plan on introducing and KafkaStreams specific uncaught exception handler. Currently we us use the java default. However by the time this is triggered the StreamThread stream thread that threw the exception will already be dead. This prevents any actions that would make use of the information in the StreamThreadstream thread. Also if this was the last thread alive there is no way to close all clients without fist lunching first launching another thread. If the thread was killed by a cascading thread death due to a serdes error that  new thread will die as well.

Public Interfaces

Public Interfaces

Code Block
languagejava
package org.apache.kafka.streams; 

public enum UncaughtExceptionHandlerResponse { 
	DEAFULTDEFAULT,
	SHUTDOWN_STREAM_THREAD, 
	REPLACE_STREAM_THREAD, 
	SHUTDOWN_KAFKA_STREAMS_CLIENT, 
	SHUTDOWN_KAFKA_STREAMS_APPLICATION; 
} 

public interface UncaughtExceptionHandler { 
	UncaughtExceptionHandlerResponse handleUncaughtException(Thread tthread, Throwable eexception); 
}



Streams configuration error.shutdown.timeout.ms: How long to wait for the Kafka Stream client to shutdown when the shutdown is due to the result of the uncaught exception handler.

...

The user will return one of the following values in the error handler to trigger the corresponding action.


DEAFULTDEFAULT

  • Throws error and kills thread to be handled in the generic uncaughtExceptionHandler

...

SHUTDOWN_KAFKA_STREAMS_APPLICATION

  • The shutdown is communicated to the other Kafka Streams clients through the rebalance protocol.

  • All Stream Threads across the entire application are shutdown and they transit to state DEAD

  • All Kafka Streams clients, i.e., the entire Kafka Streams application, is shutdown.

  • All Kafka Streams clients transit to state ERROR. (shutdown or error?)

  • The State directory cleaner thread stop
  • The RocksDB metrics recording thread is not shutdown.

Other Changes:

When all Stream Threads are dead the State directory cleaner thread will automatically shutdown to prevent the loss of any state. When Kafka Streams transitions from no alive threads to one in the creation of a new thread the State directory cleaner thread will relaunch. (not sure if we kill the thread or simply pause it)

...