Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleNodeState
package org.apache.kafka.streams.processor.assignment;

/**
 * A read-only metadata class representing the current state of each Streams client node with at least one StreamThread participating in this rebalance
 */
public interface NodeState {       
  
  /**
   * @return a new {@link NodeAssignment} that can be used to assign tasks for this node
   */
  NodeAssignment newAssignmentForNode();

  /**
   * @return the processId of the application instance running on this node
   */
  UUID processId();

  /**
   * Returns the number of StreamThreads on this node, which is equal to the number of main consumers
   * and represents its overall capacity.
   * <p>
   * NOTE: this is actually the "minimum capacity" of a node, or the minimum number of assigned
   * active tasks below which the node will have been over-provisioned and unable to give every
   * available StreamThread an active task assignment
   *
   * @return the number of consumers on this node
   */
  int numStreamThreads();

  /**
   * @return the set of consumer client ids for all StreamThreads on the given node
   */
  SortedSet<String> consumers();

  /**
   * @return the consumer on this node that previously owned this partition in the previous rebalance
   */
  String previousOwnerForPartition(final TopicPartition topicPartition);

  /**
   * @return the set of all active tasks owned by consumers on this node since the previous rebalance
   */
  SortedSet<TaskId> previousActiveTasks();

  /**
   * @return the set of all standby tasks owned by consumers on this node since the previous rebalance
   */
  SortedSet<TaskId> previousStandbyTasks();

  /**
   * Returns the total lag across all logged stores in the task. Equal to the end offset sum if this client
   * did not have any state for this task on disk.
   *
   * @return end offset sum - offset sum
   *          Task.LATEST_OFFSET if this was previously an active running task on this client
   */
  long lagFor(final TaskId task);

  /**
   * @return the previous tasks assigned to this consumer ordered by lag, filtered for any tasks that don't exist in this assignment
   */
  SortedSet<TaskId> prevTasksByLag(final String consumer);

  /**
   * Returns a collection containing all (and only) stateful tasks in the topology by {@link TaskId},
   * mapped to its "offset lag sum". This is computed as the difference between the changelog end offset
   * and the current offset, summed across all logged state stores in the task.
   *
   * @return a map from all stateful tasks to their lag sum
   */
  Map<TaskId, Long> statefulTasksToLagSums();

  /**
   * The {@link HostInfo} of this node, if set via the
   * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG application.server} config
   *
   * @return the host info for this node if configured, else {@code Optional.empty()}
   */
  Optional<HostInfo> hostInfo();

  /**
   * The client tags for this client node, if set any have been via configs using the
   * {@link org.apache.kafka.streams.StreamsConfig#clientTagPrefix}
   * <p>
   * Can be used however you want, or passed in to enable the rack-aware standby task assignor.
   *
   * @return all the client tags found in this node's {@link org.apache.kafka.streams.StreamsConfig}
   */
  Map<String, String> clientTags();
 }

The NodeState will be wrapped up along with the other inputs to the assignor (such as the configuration and set of tasks to be assigned, as well as various utilities that may be useful) in the final new interface, the ApplicationMetadata:

...