THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
final Instant upper = Instant.now(); final Instant lower = upper.minusSeconds(60); final WindowKeyQuery<GenericKey, ValueAndTimestamp<GenericRow>> query = WindowKeyQuery.withKeyAndWindowBounds(key, lower, upper); final StateQueryRequest<WindowStoreIterator<ValueAndTimestamp<GenericRow>>> request = inStore("rocksdb-window-store").withQuery(query); final StateQueryResult<WindowStoreIterator<ValueAndTimestamp<GenericRow>>> result = stateStore.getKafkaStreams().query(request); final WindowStoreIterator<ValueAndTimestamp<GenericRow>> iterator = result.getGlobalResult().getResult(); |
The following example illustrates the use of the WindowQuery class.
Code Block |
---|
final Instant lowerupper = calculateLowerBound(windowStartBounds, windowEndBoundsInstant.now(); final Instant upperlower = calculateUpperBound(windowStartBounds, windowEndBoundsupper.minusSeconds(60); final WindowRangeQuery<GenericKey, ValueAndTimestamp<GenericRow>> query = WindowRangeQuery.withWindowRange(lower, upper); final StateQueryRequest<KeyValueIterator<Windowed<GenericKey>, ValueAndTimestamp<GenericRow>>> request = inStore(stateStore.getStateStoreName()).withQuery(query); final StateQueryResult<KeyValueIterator<Windowed<GenericKey>, ValueAndTimestamp<GenericRow>>> result = stateStore.getKafkaStreams().query(request); final KeyValueIterator<Windowed<GenericKey>, ValueAndTimestamp<GenericRow>> iterator = result.getGlobalResult().getResult(); |
...