Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In Kafka Streams, a method KafkaStreams#isConnected() will be added. When something happens unexpectedly which causes the connection to vanish, KafkaStreams#isConnected() will return false. Please note that the difference between DISCONNECTED  and DEAD  is that KafkaStreams, when it is in its dead state, is no longer running. While in the DISCONNECTED  case, it would still be alive, but could not connect to broker. We would add a new state to getState()  so that it will return State.DISCONNECTED  should the consumer be disconnected to the broker.

The enum State found in KafkaStreams would be modified as follows:

Code Block
languagejava
themeEclipse
titlepublic enum State
    /**
     * Kafka Streams states are the possible state that a Kafka Streams instance can be in.
     * An instance must only be in one state at a time.
     * The expected state transition with the following defined states is:
     *
     * <pre>
     *                 +--------------+
     *         +<----- | Created (0)  |----------------+
     *         |       +-----+--------+                |
     *         |             |                         |
     *         |             v                         v
     *         |       +----+--+------+           +----------------+
     *         |       | Re-          |           |     Dis-       |
     *         +<----- | Balancing (1)| <-------->|  connected(6)  |
     *         |       +-----+-+------+           +----------------+
     *         |             | ^                       ^
     *         |             v |                       |
     *         |       +--------------+                |
     *         |       | Running (2)  | <--------------+
     *         |       +------+-------+          
     *         |              |                  
     *         |              v                  
     *         |       +------+-------+     +----+-------+
     *         +-----> | Pending      |<--- | Error (5)  |
     *                 | Shutdown (3) |     +------------+
     *                 +------+-------+
     *                        |
     *                        v
     *                 +------+-------+
     *                 | Not          |
     *                 | Running (4)  |
     *                 +--------------+
     *
     *
     * </pre>
     * Note the following:
     * - RUNNING state will transit to REBALANCING if any of its threads is in PARTITION_REVOKED state
     * - REBALANCING state will transit to RUNNING if all of its threads are in RUNNING state
     * - DISCONNECTED state will indicate that all threads are disconnected from their corresponding broker
     * - Any state except NOT_RUNNING can go to PENDING_SHUTDOWN (whenever close is called)
     * - Of special importance: If the global stream thread dies, or all stream threads die (or both) then
     *   the instance will be in the ERROR state. The user will need to close it.
     */
    public enum State {
        CREATED(1, 3, 6), REBALANCING(2, 3, 5, 6), RUNNING(1, 3, 5, 6), PENDING_SHUTDOWN(4), NOT_RUNNING, DISCONNECTED(2, 3), ERROR(3);

        private final Set<Integer> validTransitions = new HashSet<>();

        State(final Integer... validTransitions) {
            this.validTransitions.addAll(Arrays.asList(validTransitions));
        }

        public boolean isRunning() {
            // technically DISCONNECTED means the KafkaStream threads are alive, although the connection is dead
            return equals(RUNNING) || equals(REBALANCING) || equals(DISCONNECTED);
        }

        public boolean isValidTransition(final State newState) {
            return validTransitions.contains(newState.ordinal());
        }
    }


This would also mean that a new method would be added to KafkaConsumer to allow the StreamThread to query the health of the connection.

Code Block
languagejava
themeEclipse
titleKafkaConsumer#isConnected()
collapsetrue
/**
 *	@return whether or not the connection is alive
 */
public boolean isConnected();



Proposed Changes

We would query individual StreamThreads for their individual status and update the state accordingly. 

...