THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
final Instant timeTo = Instant.now(); final Instant timeFrom = timeTo.minusSeconds(60); final WindowKeyQuery<GenericKey, ValueAndTimestamp<GenericRow>> query = WindowKeyQuery.withKeyAndWindowStartRange(key, timeFrom, timeTo); final StateQueryRequest<WindowStoreIterator<ValueAndTimestamp<GenericRow>>> request = inStore("rocksdb-window-store").withQuery(query); final StateQueryResult<WindowStoreIterator<ValueAndTimestamp<GenericRow>>> result = stateStore.getKafkaStreams()kafkaStream.query(request); final WindowStoreIterator<ValueAndTimestamp<GenericRow>> iterator = result.getGlobalResult().getResult(); |
The following example illustrates the use of the WindowQuery class to query a window store.
Code Block |
---|
final Instant uppertimeTo = Instant.now(); final Instant lowertimeFrom = uppertimeTo.minusSeconds(60); final WindowRangeQuery<GenericKey, ValueAndTimestamp<GenericRow>> query = WindowRangeQuery.withWindowRangewithWindowStartRange(lowertimeFrom, uppertimeTo); final StateQueryRequest<KeyValueIterator<Windowed<GenericKey>, ValueAndTimestamp<GenericRow>>> request = inStore("inmemory-window-store").withQuery(query); final StateQueryResult<KeyValueIterator<Windowed<GenericKey>, ValueAndTimestamp<GenericRow>>> result = stateStore.getKafkaStreams()kafkaStreams.query(request); final KeyValueIterator<Windowed<GenericKey>, ValueAndTimestamp<GenericRow>> iterator = result.getGlobalResult().getResult(); |
...