...
- Utilizing the existing
WindowRangeQuery
class, we can make some modifications to realize the concepts ofKeyValueIterator<Windowed<K>, V> fetch(K keyFrom, K keyTo, Instant timeFrom, Instant timeTo)
. - We want to deprecate
WindowKeyQuery
class
Examples
The following example illustrates the use of the WindowRangeQuery class to query a kv-store or ts kv-store.
withWindowKeyRange(2, 2, time=2023-01-01T10:05:00.00Z, time=2023-01-01T10:10:00.00Z), the result is (4, 5)
Code Block | ||||
---|---|---|---|---|
| ||||
public <V> void shouldHandleWindowRangeQuery(
final Integer keyFrom,
final Integer keyTo,
final Instant timeFrom,
final Instant timeTo,
final Function<V, Integer> valueExtactor,
final Set<Integer> expectedValues) {
final WindowRangeQuery<Integer, V> query = WindowRangeQuery.withWindowKeyRange(keyFrom, keyTo, timeFrom, timeTo);
final StateQueryRequest<KeyValueIterator<Windowed<Integer>, V>> request =
inStore(STORE_NAME)
.withQuery(query)
.withPartitions(mkSet(0, 1))
.withPositionBound(PositionBound.at(INPUT_POSITION));
final StateQueryResult<KeyValueIterator<Windowed<Integer>, V>> result =
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
if (result.getGlobalResult() != null) {
fail("global tables aren't implemented");
} else {
final Set<Integer> actualValues = new HashSet<>();
final Map<Integer, QueryResult<KeyValueIterator<Windowed<Integer>, V>>> queryResult = result.getPartitionResults();
for (final int partition : queryResult.keySet()) {
final boolean failure = queryResult.get(partition).isFailure();
if (failure) {
throw new AssertionError(queryResult.toString());
}
assertThat(queryResult.get(partition).isSuccess(), is(true));
assertThrows(
IllegalArgumentException.class,
queryResult.get(partition)::getFailureReason
);
assertThrows(
IllegalArgumentException.class,
queryResult.get(partition)::getFailureMessage
);
try (final KeyValueIterator<Windowed<Integer>, V> iterator = queryResult.get(partition).getResult()) {
while (iterator.hasNext()) {
actualValues.add(valueExtactor.apply(iterator.next().value));
}
}
assertThat(queryResult.get(partition).getExecutionInfo(), is(empty()));
}
assertThat("Result:" + result, actualValues, is(expectedValues));
assertThat("Result:" + result, result.getPosition(), is(INPUT_POSITION));
}
} |
Test Plan
With the introduction of the new keyTo
argument to the WindowRangeQuery
class, it is necessary to update all associated tests. Since SessionRangeQuery
and WindowKeyQuery
also utilize methods from WindowRangeQuery
, we must ensure that the tests covering SessionRangeQuery
and WindowKeyQuery
are revised accordingly.
...