Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Adapt the KIP to the latest feedback form Guozhang

...

Code Block
languagejava
titleorg.apache.kafka.streams.ThreadMetadata
linenumberstrue
/**
 * Represents the state of a single thread running within a {@link KafkaStreams} application.
 */
public interface ThreadMetadata {


    /**
     * @return the state of the Thread
     */
    String threadState();

    /**
     * @return the name of the Thread
     */
    String threadName();

    /**
     * This function will return the set of the {@link TaskMetadata} for the current active tasks
     */
    Set<TaskMetadata> activeTasks();

    /**
     * This function will return the set of the {@link TaskMetadata} for the current standby tasks
     */
    Set<TaskMetadata> standbyTasks();

    /**
     * @return the consumer Client Id
     */
    String consumerClientId();

    /**
     * @return the restore consumer Client Id
     */
    String restoreConsumerClientId();

    /**
     * This function will return the set of Client Ids for the producers
     */
    Set<String> producerClientIds();

    /**
     * @return the admin Client Id
     */
    String adminClientId();

    /**
     * Compares the specified object with this ThreadMetadata. Returns {@code true} if and only if the specified object is
     * also a TaskMetadata and both {@code threadName()} are equal, {@code threadState()} are equal, {@code activeTasks()} contain the same
     * elements, {@code standbyTasks()} contain the same elements, {@code mainConsumerClientId()} are equal, {@code restoreConsumerClientId()}
     * are equal, {@code producerClientIds()} are equal, {@code producerClientIds} contain the same elements, and {@code adminClientId()} are equal.
     */
    boolean equals(Object o);

    /**
     * Returns the hash code value for this TaskMetadata. The hash code of a list is defined to be the result of the following calculation:
     * <pre>
     * {@code
     * Objects.hash(
     *             threadName,
     *             threadState,
     *             activeTasks,
     *             standbyTasks,
     *             mainConsumerClientId,
     *             restoreConsumerClientId,
     *             producerClientIds,
     *             adminClientId);
     * </pre>
     */
    int hashCode();
}

UPDATE

During the vote thread discussion it was requested if it would be possible to align this KIP with

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-12370
. This seems doable only with one of the changes that shows some concerns. Discussion: https://lists.apache.org/x/thread.html/rfaaa8c40903eca9ba2c8b406c486e6beee3fa916942c28446358c9b1@%3Cdev.kafka.apache.org%3E

The first one is converting StreamsMetadata into an interface under org.apache.kafka.streams with an internal implementation:

Code Block
languagejava
titleorg.apache.kafka.streams.StreamsMetadata
linenumberstrue
public interface StreamsMetadata {

    /**
     * The value of {@link StreamsConfig#APPLICATION_SERVER_CONFIG} configured for the streams
     * instance, which is typically host/port
     *
     * @return {@link HostInfo} corresponding to the streams instance
     */
    HostInfo hostInfo();

    /**
     * State stores owned by the instance as an active replica
     *
     * @return set of active state store names
     */
    Set<String> stateStoreNames();

    /**
     * Topic partitions consumed by the instance as an active replica
     *
     * @return set of active topic partitions
     */
    Set<TopicPartition> topicPartitions();

    /**
     * (Source) Topic partitions for which the instance acts as standby.
     *
     * @return set of standby topic partitions
     */
    Set<TopicPartition> standbyTopicPartitions();

    /**
     * State stores owned by the instance as a standby replica
     *
     * @return set of standby state store names
     */
    Set<String> standbyStateStoreNames();

    /**
     * This method is equivalent to call {@code StreamsMetadata.hostInfo().host();}
     */
    String host();

    /**
     * This method is equivalent to call {@code StreamsMetadata.hostInfo().port();}
     */
    int port();

    /**
     * Compares the specified object with this StreamsMetadata. Returns {@code true} if and only if the specified object is
     * also a StreamsMetadata and for both {@code hostInfo()} are equal, and {@code stateStoreNames()}, {@code topicPartitions()},
     * {@code standbyStateStoreNames()}, and {@code standbyTopicPartitions()} contain the same elements.
     */
    boolean equals(Object o);

    /**
     * Returns the hash code value for this TaskMetadata. The hash code of a list is defined to be the result of the following calculation:
     * <pre>
     * {@code
     * Objects.hash(hostInfo(), stateStoreNames(), topicPartitions(), standbyStateStoreNames(), standbyTopicPartitions());
     * }
     */
    int hashCode();

}

Along with this change, the org.apache.kafka.streams.state.StreamsMetadata is deprecated:

Code Block
languagejava
titleorg.apache.kafka.streams.state.StreamsMetadata
linenumberstrue
/**
 * Represents the state of an instance (process) in a {@link KafkaStreams} application.
 * It contains the user supplied {@link HostInfo} that can be used by developers to build
 * APIs and services to connect to other instances, the Set of state stores available on
 * the instance and the Set of {@link TopicPartition}s available on the instance.
 * NOTE: This is a point in time view. It may change when rebalances happen.
 * @deprecated since 3.0.0 use {@link org.apache.kafka.streams.StreamsMetadata}
 */
@Deprecated
public class StreamsMetadata {
...
}

For this change, KafkaSTreams class needs to be adapted to return the new StreamsMetadata interface instead, while at the same time deprecate the old method:


Additionally, KafkaStreams class needs to adapted to return the new ThreadMetadata interface, and have the old method (returning the old class) deprecated as follows:


Code Block
languagejava
titleorg.apache.kafka.streams.KafkaStreams
linenumberstrue
/**
 * Returns runtime information about the local threads of this {@link KafkaStreams} instance.
 *
 * @return the set of {@link org.apache.kafka.streams.processor.ThreadMetadata}.
 * @deprecated since 3.0 use {@link #threadsMetadata()}
 */
@Deprecated
public Set<org.apache.kafka.streams.processor.ThreadMetadata> localThreadsMetadata() {
    return threadsMetadata().stream().map(threadMetadata -> new org.apache.kafka.streams.processor.ThreadMetadata(
            threadMetadata.threadName(),
            threadMetadata.threadState(),
            threadMetadata.consumerClientId(),
            threadMetadata.restoreConsumerClientId(),
            threadMetadata.producerClientIds(),
            threadMetadata.adminClientId(),
            threadMetadata.activeTasks().stream().map(taskMetadata -> new TaskMetadata(
                    taskMetadata.taskId().toString(),
                    taskMetadata.topicPartitions(),
                    taskMetadata.committedOffsets(),
                    taskMetadata.endOffsets(),
                    taskMetadata.timeCurrentIdlingStarted())
            ).collect(Collectors.toSet()),
            threadMetadata.standbyTasks().stream().map(taskMetadata -> new TaskMetadata(
                    taskMetadata.taskId().toString(),
                    taskMetadata.topicPartitions(),
                    taskMetadata.committedOffsets(),
                    taskMetadata.endOffsets(),
                    taskMetadata.timeCurrentIdlingStarted())
            ).collect(Collectors.toSet())))
            .collect(Collectors.toSet());
}
 
/**
 * Returns runtime information about the local threads of this {@link KafkaStreams} instance.
 *
 * @return the set of {@link ThreadMetadata}.
 */
public Set<ThreadMetadata> threadsMetadata() {
    final Set<ThreadMetadata> threadMetadata = new HashSet<>();
    processStreamThread(thread -> {
        synchronized (thread.getStateLock()) {
            if (thread.state() != StreamThread.State.DEAD) {
                threadMetadata.add(thread.getThreadMetadata());
            }
        }
    });
    return threadMetadata;
}




UPDATE

During the vote thread discussion it was requested if it would be possible to align this KIP with

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-12370
. This seems doable only with one of the changes that shows some concerns. Discussion: https://lists.apache.org/x/thread.html/rfaaa8c40903eca9ba2c8b406c486e6beee3fa916942c28446358c9b1@%3Cdev.kafka.apache.org%3E

The first one is converting StreamsMetadata into an interface under org.apache.kafka.streams with an internal implementation:


Code Block
languagejava
titleorg.apache.kafka.streams.StreamsMetadata
linenumberstrue
public interface StreamsMetadata {

    /**
     * The value of {@link StreamsConfig#APPLICATION_SERVER_CONFIG} configured for the streams
     * instance, which is typically host/port
     *
     * @return {@link HostInfo} corresponding to the streams instance
     */
    HostInfo hostInfo();

    /**
     * State stores owned by the instance as an active replica
     *
     * @return set of active state store names
     */
    Set<String> stateStoreNames();

    
Code Block
languagejava
titleorg.apache.kafka.streams.KafkaStreams
linenumberstrue
/**
     * Topic partitions Findconsumed allby currentlythe runninginstance {@codeas KafkaStreams}an instancesactive (potentiallyreplica
 remotely) that use the same*
     * {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all instances that belong to@return set of active topic partitions
     */
    Set<TopicPartition> topicPartitions();

    /**
 * the same Kafka Streams* application(Source) Topic andpartitions returnfor {@linkwhich StreamsMetadata}the forinstance eachacts discoveredas instancestandby.
     * <p>
     * Note:@return thisset isof astandby pointtopic inpartitions
 time view and it may*/
 change due to partition reassignment.Set<TopicPartition> standbyTopicPartitions();

     /**
     * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this applicationState stores owned by the instance as a standby replica
     *
     * @return @deprecatedset since 3.0.0 use {@link KafkaStreams#allRunningMetadata}of standby state store names
     */
    @DeprecatedSet<String> standbyStateStoreNames();

    public Collection<org.apache.kafka.streams.state.StreamsMetadata> allMetadata() {
        validateIsRunningOrRebalancing();/**
     * This method is equivalent to call {@code StreamsMetadata.hostInfo().host();}
     */
    returnString streamsMetadataState.getAllMetadata().stream().map(streamsMetadata ->host();

    /**
     * This method is equivalent to call new{@code orgStreamsMetadata.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(),hostInfo().port();}
     */
    int port();

    /**
     * Compares the specified object streamsMetadata.stateStoreNames(),
           with this StreamsMetadata. Returns {@code true} if and only if the specified object is
     * also a StreamsMetadata and for both {@code streamsMetadata.topicPartitionshostInfo(),
} are equal, and {@code stateStoreNames()}, {@code topicPartitions()},
     * {@code standbyStateStoreNames()}, and {@code standbyTopicPartitions()} contain the same elements.
   streamsMetadata.standbyStateStoreNames(),  */
    boolean equals(Object o);

    /**
     * Returns the hash code value for this  streamsMetadata.standbyTopicPartitions()))
                .collect(Collectors.toSet());
    }

    /**TaskMetadata. The hash code of a list is defined to be the result of the following calculation:
     * <pre>
     * Find{@code
  all currently running * Objects.hash(hostInfo(), stateStoreNames(), topicPartitions(), standbyStateStoreNames(), standbyTopicPartitions());
     * }
     */
    int hashCode();

}


Along with this change, the org.apache.kafka.streams.state.StreamsMetadata is deprecated:


Code Block
languagejava
titleorg.apache.kafka.streams.state.StreamsMetadata
linenumberstrue
/**
 * Represents the state of an instance (process) in a {@link KafkaStreams} application.
 * It contains the user supplied {@link HostInfo} that can be used by developers to build
 * APIs and services to connect to other instances, the Set of state stores available on
 * the instance and the Set of {@link TopicPartition}s available on the instance.
 * NOTE: This is a point in time view. It may change when rebalances happen.
 * @deprecated since 3.0.0 use {@link org.apache.kafka.streams.StreamsMetadata}
 */
@Deprecated
public class StreamsMetadata {
...
}


For this change, KafkaSTreams class needs to be adapted to return the new StreamsMetadata interface instead, while at the same time deprecate the old method:


Code Block
languagejava
titleorg.apache.kafka.streams.KafkaStreams
linenumberstrue
{@code KafkaStreams} instances (potentially remotely) that use the same
     * {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all instances that belong to
     * the same Kafka Streams application) and return {@link StreamsMetadata} for each discovered instance.
     * <p>
     * Note: this is a point in time view and it may change due to partition reassignment.
     *
     * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application
     */
    public Collection<StreamsMetadata> allRunningMetadata() {
        validateIsRunningOrRebalancing();
        return streamsMetadataState.getAllMetadata();
    }

    /**
     * Find all currently running {@code KafkaStreams} instances (potentially remotely) that
 use the   * <ul>same
     *   <li>use the same {@link StreamsConfig#APPLICATION{@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all
     *       instances that belong to the same Kafka Streams application)</li>
     *   <li>and that contain a {@link StateStore} with the givensame {@code storeName}</li>
     * </ul>
     * Kafka Streams application) and return {@link StreamsMetadata} for each discovered instance.
     * <p>
     * Note: this is a point in time view and it may change due to partition reassignment.
     *
     * @param storeName the {@code storeName} to find metadata for
     * @return {@return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of
     * this application
     * @deprecated since 3.0.0 use {@link KafkaStreams#allMetadataForGivenStoreKafkaStreams#allRunningMetadata} instead
     */
    @Deprecated
    public Collection<org.apache.kafka.streams.state.StreamsMetadata> allMetadataForStoreallMetadata(final String storeName) {
        validateIsRunningOrRebalancing();
        return streamsMetadataState.getAllMetadataForStoregetAllMetadata(storeName).stream().map(streamsMetadata ->
                new org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(),
                        streamsMetadata.stateStoreNames(),
                        streamsMetadata.topicPartitions(),
                        streamsMetadata.standbyStateStoreNames(),
                        streamsMetadata.standbyTopicPartitions()))
                .collect(Collectors.toSet());
    }

    /**
     * Find all currently running {@code KafkaStreams} instances (potentially remotely) that
 use the   * <ul>same
     *   <li>use the same {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all
     *       instances that belong to the same Kafka Streams application)</li>
     *   <li>and that contain a {@link StateStore} with the given {@code storeName}</li>
     * </ul>
     * and return {@link StreamsMetadata} for each discovered instance.
     * <p>
     * Note: this is a point in time view and it may change due to partition reassignment.
     *
     * @param storeName the {@code storeName} to find metadata for
     * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of
     * this application
     */
    public Collection<StreamsMetadata> allMetadataForGivenStore(final String storeName) {
        validateIsRunningOrRebalancing();
        return streamsMetadataState.getAllMetadataForStore(storeName);
    }

With this last chane onf the KIP to align with 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-12370
, KafkaStreams class needs to adapted to return the new StreamsMetadata interface instead for the new equivalent method for localThreadsMetadata, and have the old method (returning the old class) deprecated as follows. This is the change I'm not so sure as the what the user can do with the returned object differs substantially:

Code Block
languagejava
titleorg.apache.kafka.streams.KafkaStreams.java
linenumberstrue
	/**
     * Returns runtime information about the local threads of this {@link KafkaStreams}., all instances that belong to
     * the same Kafka Streams application) and return {@link StreamsMetadata} for each discovered instance.
     * <p>
     * @return the set of {@link org.apache.kafka.streams.processor.ThreadMetadata} Note: this is a point in time view and it may change due to partition reassignment.
     *
   @deprecated since 3.0* use@return {@link #threadsMetadata()}
     */StreamsMetadata} for each {@code KafkaStreams} instances of this application
    @Deprecated */
    public Set<org.apache.kafka.streams.processor.ThreadMetadata> localThreadsMetadataCollection<StreamsMetadata> allRunningMetadata() {
		final Set<ThreadMetadata> threadMetadata = new HashSet<>        validateIsRunningOrRebalancing();
        processStreamThread(thread -> {return streamsMetadataState.getAllMetadata();
    }

    /**
     * Find all currently running {@code KafkaStreams} synchronizedinstances (thread.getStateLock()) {potentially remotely) that
     * <ul>
     *   <li>use  if (thread.state() != StreamThread.State.DEAD) {the same {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all
     *       instances that belong to the same Kafka  threadMetadata.add(thread.getThreadMetadata());Streams application)</li>
     *   <li>and that contain a {@link StateStore} with the given {@code storeName}</li>
     * </ul>
     * }
    and return {@link StreamsMetadata} for each discovered instance.
    });
 * <p>
     * return threadsMetadata.stream().map(threadMetadata -> new org.apache.kafka.streams.processor.ThreadMetadata(
           Note: this is a point in time view and it may change due to partition reassignment.
     threadMetadata.threadName(),*
     * @param storeName the {@code storeName} to find metadata for
  threadMetadata.threadState(),
   * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code  threadMetadata.consumerClientId(),storeName} of
     * this application
     * @deprecated since  threadMetadata.restoreConsumerClientId(),
      3.0.0 use {@link KafkaStreams#allMetadataForGivenStore} instead
     */
     threadMetadata.producerClientIds(),@Deprecated
    public Collection<org.apache.kafka.streams.state.StreamsMetadata> allMetadataForStore(final String storeName) {
        threadMetadata.adminClientIdvalidateIsRunningOrRebalancing(),;
                threadMetadata.activeTasks(return streamsMetadataState.getAllMetadataForStore(storeName).stream().map(taskMetadatastreamsMetadata -> new TaskMetadata(
                        taskMetadata.taskId().toStringnew org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(),
                        taskMetadatastreamsMetadata.topicPartitionsstateStoreNames(),
                        taskMetadatastreamsMetadata.committedOffsetstopicPartitions(),
                        taskMetadatastreamsMetadata.endOffsetsstandbyStateStoreNames(),
                        taskMetadatastreamsMetadata.timeCurrentIdlingStartedstandbyTopicPartitions()))
                ).collect(Collectors.toSet()),;
    }

            threadMetadata.standbyTasks().stream().map(taskMetadata -> new TaskMetadata(/**
     * Find all currently running {@code              taskMetadata.taskId().toString(),KafkaStreams} instances (potentially remotely) that
     * <ul>
     *   <li>use the same {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance taskMetadata.topicPartitions(),(i.e., all
     *       instances that belong to the same Kafka      taskMetadata.committedOffsets(),Streams application)</li>
     *   <li>and that contain a {@link StateStore} with the given        taskMetadata.endOffsets(),{@code storeName}</li>
     * </ul>
     * and return {@link StreamsMetadata} for each discovered instance.
     taskMetadata.timeCurrentIdlingStarted()) * <p>
     * Note: this is a point in time view and  ).collect(Collectors.toSet())))
     it may change due to partition reassignment.
     *
      .collect(Collectors.toSet());
    }

    /*** @param storeName the {@code storeName} to find metadata for
     * Returns runtime information about the local threads of this {@link KafkaStreams} instance. @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of
     * this application
     */
 @return the set ofpublic {@link ThreadMetadata}.
     */
Collection<StreamsMetadata> allMetadataForGivenStore(final String storeName) {
      public Set<StreamsMetadata> threadsMetadatavalidateIsRunningOrRebalancing();
 {
       return ...streamsMetadataState.getAllMetadataForStore(storeName);
    }


Proposed Changes

By introducing a new TaskMetadata interface on a different package, we are able to keep the same name (no need to come up with some forced name), and also, we are freed from carrying over changes like the one introduced in https://github.com/apache/kafka/pull/10755 where a method getTaskId was introduced (going against Kafka naming convention) because taskId was already taken and needed to be deprecated.
Together with this change, a new class TaskMetadataImpl will be created under org.apache.kafka.streams.processor.internals, which will implement the aforementioned interface with the implementation present on the org.apache.kafka.streams.processor.TaskMetadata. The rest of the Kafka code base using the newly deprecated TaskMetadata will be migrated to use the new classes instead.

...

Additionally, this change forced us to keep the method `getTaskId` for a while, until we could deprecate and remove the current `taskId` method.


The inclusion of more complex changes mentioned in

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-12370
like changing the return type of localThreadsMetadata to be a Set of StreamsMetadta (instead of a Set of ThreadsMetadta) was rejected as the idea came late into the process and the community wouldn't have enough time to give their opinion on this more significant change.