Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.kafka.streams;
 
public enum StreamsUncaughtExceptionHandlerResponse {
    SHUTDOWN_STREAM_THREAD,
	REPLACE_STREAM_THREAD,
    SHUTDOWN_KAFKA_STREAMS_CLIENT,
    SHUTDOWN_KAFKA_STREAMS_APPLICATION;
}

public enum StreamsUncaughtExceptionHandlerResponseGlobalThread {
    SHUTDOWN_KAFKA_STREAMS_CLIENT;
}
 
public interface StreamsUncaughtExceptionHandler {
    UncaughtExceptionHandlerResponse handleUncaughtException(Throwable exception);
	StreamsUncaughtExceptionHandlerResponseGlobalThread handleExceptionInGlobalThread(final Throwable exception);
}

KafkaStreams.java/**
* Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly
* terminates due to an uncaught exception.
*
* @param eh the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes the current handler
* @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}.
*/public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler eh) ;

...

  • All Stream Threads in the client are shutdown and they transit to state DEAD
  • The Kafka Streams client transits to state ERRORNOT_RUNNING.

  • The State directory cleaner thread will stop 
  • The RocksDB metrics recording thread will stop.

...

  • The shutdown is communicated to the other Kafka Streams clients through the rebalance protocol.

  • All Stream Threads across the entire application are shutdown and they transit to state DEAD

  • All Kafka Streams clients, i.e., the entire Kafka Streams application, is shutdown.

  • All Kafka Streams clients transit to state ERROR. 

  • The State directory cleaner thread stop
  • The RocksDB metrics recording thread will stop.

SHUTDOWN_KAFKA_STREAMS_CLIENT (for global thread)

  • All Stream Threads in the client are shutdown and they transit to state DEAD
  • The Kafka Streams client transits to state ERROR.

  • The State directory cleaner thread will stop 
  • The RocksDB metrics recording thread will stop.


In order to communicate the shutdown request from one client to the others we propose to update the SubcriptionInfoData to include a short field which will encode an error code. The error will be propagated through the metadata during a rejoin event via the assignor. The actual shutdown will be handled by the StreamsRebalnceListener, this is where the INCOMPLETE_SOURCE_TOPIC_METADATA error can also be handled.

...