...
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Proposed Changes
We need to introduce an additional argument, keyTo
several additional arguments, upper, key, newTimeFrom, newTimeTo
, in the WindowRangeQuery
class, and subsequently update the methods within this class to accommodate this these new parameterparameters.
We also add a several new method public static <K, V> WindowRangeQuery<K, V> withWindowKeyRange(final K keyFrom,final K keyTo,final Instant timeFrom,final Instant timeTo)
withAllKey(), fromTime(), toTime(), withKeyRange() to this class.
Code Block | ||||
---|---|---|---|---|
| ||||
public class WindowRangeQuery<K, V> implements Query<KeyValueIterator<Windowed<K>, V>> { ... private final Optional<K> keyTo; // newly added public static <K, V> WindowRangeQuery<K, V> withWindowKeyRangewithAllKey(final K keyFrom,) { return new WindowRangeQuery<>(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()); } @SuppressWarnings("unchecked") public <K, V> WindowRangeQuery<K, V> fromTime(final Instant timeFrom) { return (WindowRangeQuery<K, V>) new WindowRangeQuery<>(lower, upper, Optional.empty(), Optional.empty(), Optional.empty(), Optional.ofNullable(timeFrom), newTimeTo); } // newly added final K keyTo, @SuppressWarnings("unchecked") public <K, V> WindowRangeQuery<K, V> toTime(final Instant timeTo) { return (WindowRangeQuery<K, V>) new WindowRangeQuery<>(lower, upper, Optional.empty(), Optional.empty(), Optional.empty(), newTimeFrom, Optional.ofNullable(timeTo)); } // newly added public static <K, V> WindowRangeQuery<K, V> withKeyRange(final K lower, final K upper) { return final Instant timeFrom, new WindowRangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()); } // newly added public Optional<K> lowerKeyBound() { return lower; } // newly added public Optional<K> upperKeyBound() { return upper; } final Instant timeTo)@Deprecated public Optional<K>Optional<Instant> lowerKeyBoundgetTimeFrom() { return timeFrom; } @Deprecated public Optional<K>Optional<Instant> upperKeyBoundgetTimeTo() { return timeTo; } @Deprecated// newly added public Optional<Instant> getTimeFromtimeFrom() { return timeFrom; } // newly added public Optional<Instant> timeFromtimeTo() { return timeTo; } // newly @Deprecatedadded public Optional<Instant> getTimeTonewTimeFrom() { return newTimeFrom; } // newly added public Optional<Instant> timeTonewTimeTo() { return newTimeTo; @Override} // newly added public StringOptional<K> toStringkey() { return key; } } |
Compatibility, Deprecation, and Migration Plan
...
Code Block | ||||
---|---|---|---|---|
| ||||
public <V> void shouldHandleWindowRangeQuery( final Integer keyFrom, final Integer keyTo, final Instant timeFrom, final Instant timeTo, final Function<V, Integer> valueExtactor, final Set<Integer> expectedValues) { final WindowRangeQuery<Integer, V> query = WindowRangeQuery.withWindowKeyRangewithKeyRange(keyFrom, keyTo, timeFrom, ).fromTime(timeFrom).toTime(timeTo); final StateQueryRequest<KeyValueIterator<Windowed<Integer>, V>> request = inStore(STORE_NAME) .withQuery(query) .withPartitions(mkSet(0, 1)) .withPositionBound(PositionBound.at(INPUT_POSITION)); final StateQueryResult<KeyValueIterator<Windowed<Integer>, V>> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); if (result.getGlobalResult() != null) { fail("global tables aren't implemented"); } else { final Set<Integer> actualValues = new HashSet<>(); final Map<Integer, QueryResult<KeyValueIterator<Windowed<Integer>, V>>> queryResult = result.getPartitionResults(); for (final int partition : queryResult.keySet()) { final boolean failure = queryResult.get(partition).isFailure(); if (failure) { throw new AssertionError(queryResult.toString()); } assertThat(queryResult.get(partition).isSuccess(), is(true)); assertThrows( IllegalArgumentException.class, queryResult.get(partition)::getFailureReason ); assertThrows( IllegalArgumentException.class, queryResult.get(partition)::getFailureMessage ); try (final KeyValueIterator<Windowed<Integer>, V> iterator = queryResult.get(partition).getResult()) { while (iterator.hasNext()) { actualValues.add(valueExtactor.apply(iterator.next().value)); } } assertThat(queryResult.get(partition).getExecutionInfo(), is(empty())); } assertThat("Result:" + result, actualValues, is(expectedValues)); assertThat("Result:" + result, result.getPosition(), is(INPUT_POSITION)); } } |
...