Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
final ReadOnlyKeyValueStore<String, Long>
   myCount = kafkaStreams.store("my-count", QueryableStoreTypes.<String, Long>keyValueStore());
final ReadOnlyWindowStore<String, String>
   joinOther =
   kafkaStreams.store("join-other", QueryableStoreTypes.<String, String>windowStore());

Discovery API

 Exposed APIs from Kafka Streams:

Code Block
/**
 * A new config will be added to StreamsConfig
 * A user defined endpoint that can be used to connect to remote KafkaStreams instances. 
 * Should be in the format host:port
 */
public static final String USER_ENDPOINT_CONFIG = "user.endpoint";


 
public class HostInfo {
   String hostname;    /* hostname for instance that contains state store */
   int port;           /* listening port for instance that contains state store */
}
public class KafkaStreamsInstance {
    private final HostInfo hostInfo; /* hostInfo from above */
    private final Set<String> stateStores; /* state stores on this instance */
    private final Set<TopicPartition> topicPartitions; /* TopicPartitions on this instance */
}

/**
 * Find all of the instances of {@link KafkaStreamsInstance} in a {@link KafkaStreams application}
 * Note: this is a point in time view and it may change due to partition reassignment.
 * @return collection containing all instances of {@link KafkaStreamsInstance} in this application
 */
public Collection<KafkaStreamsInstance> allInstances();

/**
 * Find the instances {@link KafkaStreamsInstance} for a given storeName
 * Note: this is a point in time view and it may change due to partition reassignment.
 * @param storeName the storeName to find metadata for
 * @return  A collection containing instances of {@link KafkaStreamsInstance} that have the provided storeName
 */
public Collection<KafkaStreamsInstance> allInstancesWithStore(final String storeName);

/**
 * Find the {@link KafkaStreamsInstance} for a given storeName and key.
 * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore},
 * this method provides a way of finding which host it would exist on.
 * Note: this is a point in time view and it may change due to partition reassignment.
 * @param storeName         Name of the store
 * @param key               Key to use to for partition
 * @param keySerializer     Serializer for the key
 * @param <K>               key type
 * @return  The {@link KafkaStreamsInstance} for the storeName and key
 */
public <K> KafkaStreamsInstance instanceWithKey(final String storeName, 
												   final K key, 
                                                   final Serializer<K> keySerializer);

/**
 * Find the {@link KafkaStreamsInstance} for a given storeName and key.
 * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore},
 * this method provides a way of finding which host it would exist on.
 * Note: this is a point in time view and it may change due to partition reassignment.
 * @param storeName         Name of the store
 * @param key               Key to use to for partition
 * @param partitioner       Partitioner for the store
 * @param <K>               key type
 * @return  The {@link KafkaStreamsInstance} for the storeName and key
 */
public <K> KafkaStreamsInstance instanceWithKey(final String storeName,
                                                   final K key,
                                                   final StreamPartitioner<K, ?> partitioner);

...