...
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.streams; public enum StreamsUncaughtExceptionHandlerResponse { SHUTDOWN_STREAM_THREAD, REPLACE_STREAM_THREAD, SHUTDOWN_KAFKA_STREAMS_CLIENT, SHUTDOWN_KAFKA_STREAMS_APPLICATION; } public enum StreamsUncaughtExceptionHandlerResponseGlobalThread { SHUTDOWN_KAFKA_STREAMS_CLIENT; } public interface StreamsUncaughtExceptionHandler { UncaughtExceptionHandlerResponse handleUncaughtException(Throwable exception); StreamsUncaughtExceptionHandlerResponseGlobalThread handleExceptionInGlobalThread(final Throwable exception); } KafkaStreams.java/** * Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly * terminates due to an uncaught exception. * * @param eh the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes the current handler * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. */public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler eh) ; |
...
- All Stream Threads in the client are shutdown and they transit to state DEAD
The Kafka Streams client transits to state ERRORNOT_RUNNING.
- The State directory cleaner thread will stop
The RocksDB metrics recording thread will stop.
...
The shutdown is communicated to the other Kafka Streams clients through the rebalance protocol.
All Stream Threads across the entire application are shutdown and they transit to state DEAD
All Kafka Streams clients, i.e., the entire Kafka Streams application, is shutdown.
All Kafka Streams clients transit to state ERROR.
- The State directory cleaner thread stop
- The RocksDB metrics recording thread will stop.
SHUTDOWN_KAFKA_STREAMS_CLIENT (for global thread)
- All Stream Threads in the client are shutdown and they transit to state DEAD
The Kafka Streams client transits to state ERROR.
- The State directory cleaner thread will stop
The RocksDB metrics recording thread will stop.
In order to communicate the shutdown request from one client to the others we propose to update the SubcriptionInfoData to include a short field which will encode an error code. The error will be propagated through the metadata during a rejoin event via the assignor. The actual shutdown will be handled by the StreamsRebalnceListener, this is where the INCOMPLETE_SOURCE_TOPIC_METADATA error can also be handled.
...