THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
final ReadOnlyKeyValueStore<String, Long> myCount = kafkaStreams.store("my-count", QueryableStoreTypes.<String, Long>keyValueStore()); final ReadOnlyWindowStore<String, String> joinOther = kafkaStreams.store("join-other", QueryableStoreTypes.<String, String>windowStore()); |
Discovery API
Exposed APIs from Kafka Streams:
Code Block |
---|
/**
* A new config will be added to StreamsConfig
* A user defined endpoint that can be used to connect to remote KafkaStreams instances.
* Should be in the format host:port
*/
public static final String USER_ENDPOINT_CONFIG = "user.endpoint";
public class HostInfo {
String hostname; /* hostname for instance that contains state store */
int port; /* listening port for instance that contains state store */
}
public class KafkaStreamsInstance {
private final HostInfo hostInfo; /* hostInfo from above */
private final Set<String> stateStores; /* state stores on this instance */
private final Set<TopicPartition> topicPartitions; /* TopicPartitions on this instance */
}
/**
* Find all of the instances of {@link KafkaStreamsInstance} in a {@link KafkaStreams application}
* Note: this is a point in time view and it may change due to partition reassignment.
* @return collection containing all instances of {@link KafkaStreamsInstance} in this application
*/
public Collection<KafkaStreamsInstance> allInstances();
/**
* Find the instances {@link KafkaStreamsInstance} for a given storeName
* Note: this is a point in time view and it may change due to partition reassignment.
* @param storeName the storeName to find metadata for
* @return A collection containing instances of {@link KafkaStreamsInstance} that have the provided storeName
*/
public Collection<KafkaStreamsInstance> allInstancesWithStore(final String storeName);
/**
* Find the {@link KafkaStreamsInstance} for a given storeName and key.
* Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore},
* this method provides a way of finding which host it would exist on.
* Note: this is a point in time view and it may change due to partition reassignment.
* @param storeName Name of the store
* @param key Key to use to for partition
* @param keySerializer Serializer for the key
* @param <K> key type
* @return The {@link KafkaStreamsInstance} for the storeName and key
*/
public <K> KafkaStreamsInstance instanceWithKey(final String storeName,
final K key,
final Serializer<K> keySerializer);
/**
* Find the {@link KafkaStreamsInstance} for a given storeName and key.
* Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore},
* this method provides a way of finding which host it would exist on.
* Note: this is a point in time view and it may change due to partition reassignment.
* @param storeName Name of the store
* @param key Key to use to for partition
* @param partitioner Partitioner for the store
* @param <K> key type
* @return The {@link KafkaStreamsInstance} for the storeName and key
*/
public <K> KafkaStreamsInstance instanceWithKey(final String storeName,
final K key,
final StreamPartitioner<K, ?> partitioner);
|
...