You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Status

Current state: Under Discussion

Discussion thread: https://lists.apache.org/x/thread.html/r1d20fb6dbd6b01bb84cbb17e992f4d08308980dfc5f2e0a68d674413@%3Cdev.kafka.apache.org%3E

JIRA: KAFKA-12849

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP could be seen as an spin-off of KIP-740: Clean up public API in TaskId and fix TaskMetadata#taskId(). During the one of the Pull Requests that were created to implement the aforementioned KIP, it became apparent that TaskMetadata was never a class that the user would ever need to instantiate, but rather an API for exposing metadata. For this reason, it would make sense to separate the public interface from the internal implementation, hence this proposal to create the TaskMetadata interface. See https://github.com/apache/kafka/pull/10755#discussion_r639338584 for further context on this topic.

Also it is important to notice that the goal of this KIP needs to still be consistent with that of KIP-740, which was to not expose TaskID as a String in the public APIs.

For reasons described in further detail under "Public Interfaces" this KIP should be part of release 3.0.0, so we can keep the APIs consistent without the need to wait for a full deprecation cycle.

Public Interfaces

Create a new Interface named TaskMetadata under package org.apache.kafka.streams, which will contain all currently non deprecated methods:

org.apache.kafka.streams.TaskMetadata
/**
 * Represents the state of a single task running within a {@link KafkaStreams} application.
 */
public interface TaskMetadata {


    /**
     * @return the basic task metadata such as subtopology and partition id
     */
    TaskId taskId();

    /**
     * This function will return a set of the current TopicPartitions
     */
    Set<TopicPartition> topicPartitions();

    /**
     * This function will return a map of TopicPartitions and the highest committed offset seen so far
     */
    Map<TopicPartition, Long> committedOffsets();

    /**
     * This function will return a map of TopicPartitions and the highest offset seen so far in the Topic
     */
    Map<TopicPartition, Long> endOffsets();

    /**
     * This function will return the time task idling started, if the task is not currently idling it will return empty
     */
    Optional<Long> timeCurrentIdlingStarted();
}


Additionally, TaskMetadata under org.apache.kafka.streams.processor, will be deprecated:

org.apache.kafka.streams.processor.TaskMetadata
/**
 * Represents the state of a single task running within a {@link KafkaStreams} application.
 * @deprecated since 3.0, use {@link org.apache.kafka.streams.TaskMetadata} instead
 */
@Deprecated
public class TaskMetadata {
...
}

Furthermore, the method getTaskId newly introduced in KIP-740 (but not yet released) will be removed, as it won't be needed any more. For this reason, it makes sense to include this KIP in release 3.0.0 so we can avoid a full cycle of deprecation for this newly introduced method.

org.apache.kafka.streams.processor.TaskMetadata
// This method will be removed in this KIP
/**
 * @return the basic task metadata such as subtopology and partition id
*/
public TaskId getTaskId() {
   return taskId;
}

There is one additional change that needs to be performed in ThreadMetadata. Currently, this class uses the newly deprecated TaskMetada class in its constructor and it's returned in activeTasks() and standbyTasks() methods. We should deprecate those and add new methods and constructor referencing the new Interface:

org.apache.kafka.streams.processor.ThreadMetadata
import org.apache.kafka.streams.TaskMetadata;

public class ThreadMetadata {

	....

    /**
     * @deprecated since 3.0 use {@link #ThreadMetadata(String, String, String, String, Set, Set, Set, String)} instead
     */
    @Deprecated
    public ThreadMetadata(final String threadName,
                          final String threadState,
                          final String mainConsumerClientId,
                          final String restoreConsumerClientId,
                          final Set<String> producerClientIds,
                          final String adminClientId,
                          final Set<org.apache.kafka.streams.processor.TaskMetadata> activeTasks,
                          final Set<org.apache.kafka.streams.processor.TaskMetadata> standbyTasks) {
        this.mainConsumerClientId = mainConsumerClientId;
        this.restoreConsumerClientId = restoreConsumerClientId;
        this.producerClientIds = producerClientIds;
        this.adminClientId = adminClientId;
        this.threadName = threadName;
        this.threadState = threadState;
        this.activeTasks = Collections.unmodifiableSet(activeTasks.stream().map(metadata -> new TaskMetadataImpl(TaskId.parse(metadata.taskId()),
                metadata.topicPartitions(),
                metadata.committedOffsets(),
                metadata.endOffsets(),
                metadata.timeCurrentIdlingStarted())).collect(Collectors.toSet()));
        this.standbyTasks = Collections.unmodifiableSet(standbyTasks.stream().map(metadata -> new TaskMetadataImpl(TaskId.parse(metadata.taskId()),
                metadata.topicPartitions(),
                metadata.committedOffsets(),
                metadata.endOffsets(),
                metadata.timeCurrentIdlingStarted())).collect(Collectors.toSet()));
    }

    public ThreadMetadata(final String threadName,
                          final String threadState,
                          final String mainConsumerClientId,
                          final String restoreConsumerClientId,
                          final Set<String> producerClientIds,
                          final Set<TaskMetadata> activeTasks,
                          final Set<TaskMetadata> standbyTasks,
                          final String adminClientId) {
        this.mainConsumerClientId = mainConsumerClientId;
        this.restoreConsumerClientId = restoreConsumerClientId;
        this.producerClientIds = producerClientIds;
        this.adminClientId = adminClientId;
        this.threadName = threadName;
        this.threadState = threadState;
        this.activeTasks = Collections.unmodifiableSet(activeTasks);
        this.standbyTasks = Collections.unmodifiableSet(standbyTasks);
    }

    public Set<TaskMetadata> getActiveTasks() {
        return activeTasks;
    }

    public Set<TaskMetadata> getStandbyTasks() {
        return standbyTasks;
    }

    /**
     * @deprecated since 3.0 use {@link #getActiveTasks()}
     */
    @Deprecated
    public Set<org.apache.kafka.streams.processor.TaskMetadata> activeTasks() {
        return activeTasks.stream().map(metadata ->
            new org.apache.kafka.streams.processor.TaskMetadata(metadata.taskId().toString(),
                    metadata.topicPartitions(),
                    metadata.committedOffsets(),
                    metadata.endOffsets(), metadata.timeCurrentIdlingStarted())).collect(Collectors.toSet());
    }

    /**
     * @deprecated since 3.0 use {@link #getStandbyTasks()}
     */
    @Deprecated
    public Set<org.apache.kafka.streams.processor.TaskMetadata> standbyTasks() {
        return standbyTasks.stream().map(metadata ->
                new org.apache.kafka.streams.processor.TaskMetadata(metadata.taskId().toString(),
                        metadata.topicPartitions(),
                        metadata.committedOffsets(),
                        metadata.endOffsets(), metadata.timeCurrentIdlingStarted())).collect(Collectors.toSet());
    }

	....
}


This last change could be avoided, if we allow ourselves to break source compatibility for this particular section. Otherwise, we need to go for a cycle of deprecation for activeTasks() and standbyTasks() for then, to move the non-normative getActiveTasks() and getStandbyTasks() back to the proper naming scheme.


Proposed Changes

By introducing a new interface on a different package, we are able to keep the same name (no need to come up with some forced name), and also, we are freed from carrying over changes like the one introduced in https://github.com/apache/kafka/pull/10755 where a method getTaskId was introduced (going against Kafka naming convention) because taskId was already taken and needed to be deprecated.
Together with this change, a new class TaskMetadataImpl will be created under org.apache.kafka.streams.processor.internals, which will implement the aforementioned interface with the implementation present on the org.apache.kafka.streams.processor.TaskMetadata. The rest of the Kafka code base using the newly deprecated TaskMetadata will be migrated to use the new classes instead.

Also, by introducing a new interface altogether, we ease the migration of any possible current user of this class meant to be for internal use only.

Compatibility, Deprecation, and Migration Plan

As we create a new interface, these changes are 100% compatible. Users who currently use org.apache.kafka.streams.processor.TaskMetadata, should migrate to org.apache.kafka.streams.TaskMetadata.

Class org.apache.kafka.streams.processor.TaskMetadata will be deprecated and removed in future releases

Rejected Alternatives

An alternative that was rejected, was to directly transform org.apache.kafka.streams.TaskMetadata into an interface with this change. This was seen as a breaking change as the current constructor won't be present any more in this alternative, making possible users of this fail to compile.

Additionally, this change forced us to keep the method `getTaskId` for a while, until we could deprecate and remove the current `taskId` method.


  • No labels