THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
public static void main(String[] args) throws Exception { Properties streamsConfiguration = new Properties(); ... /** * To use the Discovery API the developer must provide an host:port pair that * maps to an embedded service listening on this address. i.e., * it is up to the developer to define the protocol and create the service */ streamsConfiguration.put(StreamsConfig.USER_ENDPOINT_CONFIG, "localhost:7070"); ... KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); /** * Start your embedded service listening on the endpoint * provided above */ new QueryableStateProxy(streams).start(7070); } /** * Example Rest Proxy that uses the Discovery API to locate the * KafkaStreamsInstances StateStores are running on. A developer would first connectconnect * to a well-known instance * to find where a particular store, or store with key, * is located. They'd then use the returned KafkaStreamsInstances * to connect to the appropriate instances and perform queries, i.e, : * KafkaStreamsInstance instance = http.get("http://well-known-host:8080/state/instance/my-store/my-key"); * Long result = http.get("http://" + instance.host() + ":" + instance.port() + "/state/stores/my-store/my-key"); */ @Path("state") public class QueryableStateProxy { /** * The KafkaStreams instance knows about all of the other instances * in the application. This is maintained by the StreamPartitionAssignor * on partition assignments (rebalances) */ private final KafkaStreams streams; public QueryableStateProxy(final KafkaStreams streams) { this.streams = streams; } @GET() @Path("/instances") public Response streamsInstances() { // get the current collection of KafkaStreamsInstances final Collection<KafkaStreamsInstance> instances = streams.allInstances(); return respondWithInstances(streams.allInstances()); } @GET() @Path("/instances/{storeName}") public Response streamsInstancesForStore(@PathParam("storeName") String store) { // find all the instances that have the provided store final Collection<KafkaStreamsInstance> instances = streams.allInstancesWithStore(store); return respondWithInstances(instances); } @GET() @Path("/instance/{storeName}/{key}") public Response streamsInstancesForStoreAndKey(@PathParam("storeName") String store, @PathParam("key") String key) { // locate the instance that would have the store with the provided key (if the key exists) final KafkaStreamsInstance instance = streams.instanceWithKey(store, key, new StringSerializer()); if (instance == null) { return Response.noContent().build(); } return Response.ok(instance.toString()).build(); } @GET() @Path("/stores/{storeName}/{key}") public Response byKey(@PathParam("storeName") String storeName, @PathParam("id") String key) { // Get a handle on the Store for the provides storeName final ReadOnlyKeyValueStore<String, Long> store = streams.getStore(storeName, QueryableStoreTypes.keyValueStore()); // store may not exist or might not exist yet, i.e, if partitions haven't been assigned or // a rebalance is in process if (store == null) { return Response.noContent().build(); } // we have a store so we can get the result final Long result = store.get(key); if (result == null) { return Response.noContent().build(); } return Response.ok(result).build(); } public void start(int port) { // start your favourite http server ... } } |
Proposed implementation outline
...