Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
themeEclipse
titlepublic enum State
    /**
     * KafkaStream Streamsthread states are the possible statestates that a Kafkastream Streams instancethread can be in.
     * AnA instancethread must only be in one state at a time.
     * The expected state transitiontransitions with the following defined states is:
     *
     * <pre>
     *                 +--------------+
     *          +<----- | Created (0) |
     *          |     +-----+-----------+
     *          |           |
     *          |           v
     *          |     +-----+--------+
     *          +<--- | Starting (1)|<-----------+
     *          |     +-----+-------+            |
     *          |           |                    v
     *          |           |             v+---------------+
     *          |           v            |DISCONNECTED(7)|
     *          |       +-----+--+------+           +----------------+    
     *         | +<--- |   Partitions  | Re-     ^    ^ |   ^     
   |  *   Dis-       |
     *| Revoked (2) |      +<----- | Balancing (1)| <-------->|    |  connected(6)  |
     *          |       +-----+-------+<------+    |    |
   +----------------+
  *       *   |      |     |        | ^          |    |         ^
     *          |             v |                  |     |
     *          |       +-------+-------+            |    |
     *          |     | Partitions  | Running (2)  |            v    |
     *          +<--- | Assigned (3)| <-----------+>+    |
     *          |       +------+-------+                |
     *          |           |      |                  |
     *          |           v    v                    |
     *          |       +------+-------+                 +v
     *          |     | Running (4) | <------+------->+
     *          |     +-----+-------> | Pending+
     *          |<--- | Error (5)           |
     *          |            | Shutdown (3)v
     *          |     +-----+-------+
     *          +---> | Pending     |
     *                | Shutdown (5)|
     *                +------+-------+
     *                        |
     *                        v
     *                 +------+-------+
     *                | Dead (6)    | Not
     *                +-------------+
     * </pre>
    *
     * Note |the following:
     * <ul>
     *     <li>Any state can go to PENDING_SHUTDOWN. That is because streams  | Running (4)  |can be closed at any time.</li>
     *     <li>
     *         State PENDING_SHUTDOWN may want to transit to some other states other  +--------------+than DEAD,
     *         in the corner case when the shutdown is triggered while the thread is still in the rebalance loop.
     *         In this case we will forbid the transition but will not treat as an error.
     *     </pre>li>
     *   Note the following:<li>
     * - RUNNING state will         State PARTITIONS_REVOKED may want transit to REBALANCING if any of its threads is in PARTITION_REVOKED state itself indefinitely, in the corner case when
     *         the coordinator repeatedly fails in-between revoking partitions and assigning new partitions.
     *         Also during streams instance start up PARTITIONS_REVOKED may want to transit to itself as well.
     * - REBALANCING state will transit to RUNNING if all of its threads are in RUNNING state        In this case we will forbid the transition but will not treat as an error.
     *  - DISCONNECTED state will</li>
 indicate that all threads are* disconnected from their corresponding broker<li>
     * -         Any state except NOT_RUNNING can go to PENDING_SHUTDOWN (whenever close is called)which is considered running (i.e. STARTING, RUNNING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED) could transition
     * - Of special importance: If the global stream thread dies, or all stream threads die (or both) then
     *   the instance will be in the ERROR state. The user will need to close it.         to and from DISCONNECTED status. This state indicates that the StreamThread is alive and well,  but the connection to 
     *         broker does not exist.        
     *     </li>
     * </ul>
     */
    public enum State implements ThreadStateTransitionValidator {
        CREATED(1, 5), STARTING(2, 5, 6), PARTITIONS_REVOKED(3, 5, 6), REBALANCINGPARTITIONS_ASSIGNED(2, 34, 5, 6), RUNNING(1, 32, 5, 6), PENDING_SHUTDOWN(46), NOT_RUNNING, DISCONNECTED(2, 3,4,5), ERROR(3)DEAD;

        private final Set<Integer> validTransitions = new HashSet<>();

        State(final Integer... validTransitions) {
            this.validTransitions.addAll(Arrays.asList(validTransitions));
        }

        public boolean isRunning() {
            // technically DISCONNECTED means the KafkaStream threads are alive, although the connection is dead
            return equals(RUNNINGreturn equals(RUNNING) || equals(STARTING) || equals(PARTITIONS_REVOKED) || equals(REBALANCINGPARTITIONS_ASSIGNED) || equals(DISCONNECTED);
        }

        @Override
        public boolean isValidTransition(final StateThreadStateTransitionValidator newState) {
            final State tmpState = (State) newState;
            return validTransitions.contains(newStatetmpState.ordinal());
        }
    }

    


This would also mean that a new method would be added to KafkaConsumer to allow the StreamThread to query the health of the connection.

...