THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
@Evolving public class KeyQuery<K, V> implements Query<V> { // static factory to create a new KeyQuery, given a key public static <K, V> KeyQuery<K, V> withKey(final K key); // getter for the key public K getKey(); } // ====================================== // example usage in IQIQv2: Integer key = 1; // note that "mystore" is a KeyValueStore<Integer, ValueAndTimestamp<Integer>>, // hence the result type InteractiveQueryRequest<ValueAndTimestamp<Integer>> query = inStore("mystore") .withQuery(KeyQuery.withKey(key)); // run the query InteractiveQueryResult<ValueAndTimestamp<Integer>> result = kafkaStreams.query(query); // In this example, it doesn't matter which partition ran the query, so we just // grab the result from the partition that returned something // (this is what IQ currently does under the covers). Integer value = result.getOnlyPartitionResult().getResult().value(); // ====================================== // For comparison, here is how the query would look in current IQ: // (note, this glosses over many of the problems described above. // the example is here to illustrate the different ergonomics // of the two APIs.) // create the query parameters StoreQueryParameters<ReadOnlyKeyValueStore<Integer, ValueAndTimestamp<Integer>>> storeQueryParameters = StoreQueryParameters.fromNameAndType( "mystore", QueryableStoreTypes.timestampedKeyValueStore() ); // get a store handle to run the query ReadOnlyKeyValueStore<Integer, ValueAndTimestamp<Integer>> store = kafkaStreams.store(storeQueryParameters); // query the store Integer value1 = store.get(key).value(); |
We propose to add KeyQuery as part of the implementation of this KIP, in order to have at least one query available to flesh out tests, etc., for the framework. Other queries are deferred to later KIPs.
...