Status

Current stateAccepted

Discussion thread: here 

JIRA: here or here. The replace thread is 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently there is not a native way to shutdown and entire KStreams application from a StreamThread. By adding the handler we improve the way Streams can handle errors and preventing corruption. This functionality would be useful for immediately halting processing to prevent data from being corrupted. Though this is best effort it will shutdown every thread in the client but may have faults in certain network partition scenarios.

This would help recover form errors or problems such the following:

Public Interfaces

KafkaStreams.java

package org.apache.kafka.streams;
 
public enum StreamsUncaughtExceptionResponse {
	REPLACE_THREAD,
    SHUTDOWN_CLIENT,
    SHUTDOWN_APPLICATION;
}

 
public interface StreamsUncaughtExceptionHandler {
    StreamsUncaughtExceptionResponse handle(final Throwable exception);
}

KafkaStreams.java/**
* Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly
* terminates due to an uncaught exception.
*
* @param eh the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes the current handler
* @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}.
*/public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler eh) ;


Proposed Changes


We propose to add a new streams specific uncaught exception handler that will do the following:

REPLACE_THREAD:

SHUTDOWN_CLIENT (default)

SHUTDOWN_APPLICATION 

If the old handler is set and the new one is not the behavior will not change. Other wise the new handler will take precedence.

In order to communicate the shutdown request from one client to the others we propose to update the SubcriptionInfoData to include a short field which will encode an error code. The error will be propagated through the metadata during a rejoin event via the assignor. The actual shutdown will be handled by the StreamsRebalnceListener, this is where the INCOMPLETE_SOURCE_TOPIC_METADATA error can also be handled.



Compatibility, Deprecation, and Migration Plan

The SubcriptionInfoData will be upgraded to version 8 because we are adding a field for an error code to be propagated through the application.

Deprecate the old handler.

Rejected Alternatives