Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • source/sink topic deleted. A handler will be exposed to a the users created for error source topics in KIP-662.
  • Serialization or Deserialization failures. This can add another option to prevent rolling thread death.
    • currently they have the option of returning a Continue or Fail Enum. In the future we can expand this to include Shutdown and then throw ShutdownRequestedException in response
  • Add another option in KIP-399 if failing and having only the thread fail is not comprehensive enough
  • User specific cases such as 
    Jira
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keyKAFKA-4748

Public Interfaces

Code Block
languagejava
linenumberstrue
/**
 * Should be thrown in reaction to an event that necessitates the closure of the entire Streams application.
 *
 */
public class ShutdownRequestedException extends StreamsException {
    
    public ShutdownRequestedException(final String message) {
        super(message);
    }

    public ShutdownRequestedException(final String message, final Throwable throwable) {
        super(message, throwable);
    }

    public ShutdownRequestedException(final Throwable throwable) {
        super(throwable);
    }
}

KafkaStreams.java

Code Block
languagejava
/**
     * Attempts to shutdown an application.
     * If there is an alive StreamThread it will succeed, if there is not it will fail
     *
     * @return Will return true if shutdown is initiated false if it is not possible.
     */
    public boolean shutdownApplication();

...

Proposed Changes

We propose to add a new exception method that will cause all clients in the application to be shutdown when throwncalled. We achieve this adding shutdownApplication to `KafkaStreams.java`  which will search for an alive stream thread to start the shutdown. If there is not alive shutdown the method will return false indicating that the shutdown was not initiated.

In order to communicate the shutdown request from one client to the others we propose to update the SubcriptionInfoData to include a short field which will encode an error code. The error will be propagated through the metadata during a rejoin event via the assignor. The actual shutdown will be handled by the StreamsRebalnceListener, this is where the INCOMPLETE_SOURCE_TOPIC_METADATA error can also be handled.

We also propose adding an option to imitate the shutdown from `KafkaStreams.java`  which will search for an alive stream thread to start the shutdown. If there is not alive shutdown the method will return false indicating that the shutdown was not initiated.


Compatibility, Deprecation, and Migration Plan

...

  • Two paths, Internal Error via exception and a request method for users to call
  • Add a config option to shutdown when ever a user error is thrown - no flexible enough
  • Throwing an Exception instead of shutdown Application