Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.kafka.streams;
 
public enum StreamsUncaughtExceptionHandlerResponse {
    SHUTDOWN_STREAM_THREAD,
	REPLACE_STREAM_THREAD,
    SHUTDOWN_KAFKA_STREAMS_CLIENT,
    SHUTDOWN_KAFKA_STREAMS_APPLICATION;
}

public enum StreamsUncaughtExceptionHandlerResponseGlobalThread {
    SHUTDOWN_KAFKA_STREAMS_CLIENT;
}
 
public interface StreamsUncaughtExceptionHandler {
    UncaughtExceptionHandlerResponse handleUncaughtExceptionhandleExceptionInStreamThread(final Throwable exception);
	StreamsUncaughtExceptionHandlerResponseGlobalThread handleExceptionInGlobalThread(final Throwable exception);
}

KafkaStreams.java/**
* Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly
* terminates due to an uncaught exception.
*
* @param eh the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes the current handler
* @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}.
*/public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler eh) ;

...

We propose to add a new streams specific uncaught exception handler that will do the following:

SHUTDOWN_STREAM_THREAD (default):

  • The current stream thread is shutdown and transits to state DEAD.

  • The Kafka Streams client transits to ERROR if no other stream thread is alive.

...

SHUTDOWN_KAFKA_STREAMS_CLIENT (for global thread this is the default)

  • All Stream Threads in the client are shutdown and they transit to state DEAD
  • The Kafka Streams client transits to state ERROR.

  • The State directory cleaner thread will stop 
  • The RocksDB metrics recording thread will stop.

...