Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Hence, the discovery API is part of the KafkaStreams instance. The API will provide three four methods:

    Map<HostState, Set<TaskMetadata>>
  • Collection<KafkaStreamsInstance> KafkaStreams.
  • getAllTasks
  • getAllInstances() where
  • TaskMetadata
  • KafkaStreamsInstance has fields such as list of assigned partitions, list of state store names
  • , etc; and HostState can include host metadata (byte array),
  • and HostInfo that includes hostname / port, etc. The port is the listening port of a user-defined listener that users provide to listen for queries (e.g., using REST APIs). More on the user-defined agent below.
  • Map<HostState, Set<TaskMetadata>> KafkaStreams.getAllTasks

    Collection<KafkaStreamsInstance> KafkaStreams.getAllInstancesWIthStore(String /* storeName */) would return only the

    hosts and their assigned tasks if at least one of the tasks

    KafkaStreamsInstances that include the given store name.

    Map<HostState, Set<TaskMetadata>>
  • KafkaStreamsInstance KafkaStreams.

    getTask

    getInstanceWithKey(

    Key k,

    String

    /* storeName */

    storeName, K key, Serializer<K> serializer) would return

    only the host and their assigned task if the store with the store name has a particular key.

    the KafkaStreamInstance that has the store that might have the key (the key might not exist in any store). The default StreamPartitioner will be used for key partitioning.

  • KafkaStreamsInstance KafkaStreams.getInstanceWithKey(String storeName, K key, StreamPartitioner<K, ?> partitioner) same as above but will use the provided StreamPartitioner

The individual KafkaStreams The individual discovery instances will know about all mappings between state store names, keys and KafkaStream instances. We propose that they keep track of this information by piggybacking on the consumer group membership protocol. In particular, we propose to add the above mapping Map<HostStateMap<HostInfo, Set<TaskMetadata>>Set<TopicPartition>> to StreamPartitionAssignor so that each consumer knows about all the other tasks metadata and host states in the system.

...

 Exposed APIs from Kafka Streams:

Code Block
public class HostStateHostInfo {
   String hostname;    /* hostname for instance that contains state store */
   int port;           /* listening port for instance that contains state store */
}
public class KafkaStreamsInstance {
     byte[] hostMetadata;private final HostInfo hostInfo; /* anyhostInfo additionalfrom metadataabove */
}
public class TaskMetadata {
 private final List<String>Set<String> stateStoreNames;           stateStores; /* liststate ofstores stateon storethis namesinstance */
   List<TopicPartition> private final Set<TopicPartition> topicPartitions;   /* listTopicPartitions ofon topicsthis andinstance partitions */
}

/**
 * @returnFind metadataall aboutof allthe tasks
 */
Map<HostState, Set<TaskMetadata>> getAllTasks();
/**
 * @param storeName requested store nameinstances of {@link KafkaStreamsInstance} in a {@link KafkaStreams application}
 * @returnNote: metadatathis aboutis alla taskspoint thatin includetime 
view * storeName in this KStreams instance
 */
Map<HostState, Set<TaskMetadata>> getAllTasksWithStore(String storeName);
/**
 * @param key requested key
 * @param storeName requested store name
 * @return metadata about all tasks that include 
 * storeName and key in this KStreams instance
 */
<K> Map<HostState, Set<TaskMetadata>> getAllTasksWithKey(String storeName, K key);

...

and it may change due to partition reassignment.
 * @return collection containing all instances of {@link KafkaStreamsInstance} in this application
 */
public Collection<KafkaStreamsInstance> getAllInstances();

/**
 * Find the instances {@link KafkaStreamsInstance} for a given storeName
 * Note: this is a point in time view and it may change due to partition reassignment.
 * @param storeName the storeName to find metadata for
 * @return  A collection containing instances of {@link KafkaStreamsInstance} that have the provided storeName
 */
public Collection<KafkaStreamsInstance> getAllInstancesWithStore(final String storeName);

/**
 * Find the {@link KafkaStreamsInstance} for a given storeName and key.
 * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore},
 * this method provides a way of finding which host it would exist on.
 * Note: this is a point in time view and it may change due to partition reassignment.
 * @param storeName         Name of the store
 * @param key               Key to use to for partition
 * @param keySerializer     Serializer for the key
 * @param <K>               key type
 * @return  The {@link KafkaStreamsInstance} for the storeName and key
 */
public <K> KafkaStreamsInstance getInstanceWithKey(final String storeName, 
												   final K key, 
                                                   final Serializer<K> keySerializer);

/**
 * Find the {@link KafkaStreamsInstance} for a given storeName and key.
 * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore},
 * this method provides a way of finding which host it would exist on.
 * Note: this is a point in time view and it may change due to partition reassignment.
 * @param storeName         Name of the store
 * @param key               Key to use to for partition
 * @param partitioner       Partitioner for the store
 * @param <K>               key type
 * @return  The {@link KafkaStreamsInstance} for the storeName and key
 */
public <K> KafkaStreamsInstance getInstanceWithKey(final String storeName,
                                                   final K key,
                                                   final StreamPartitioner<K, ?> partitioner);
Proposed implementation outline

...