Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-15795
 [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Proposed Changes

We need to introduce an additional argument, keyTo several additional arguments, upper, key, newTimeFrom, newTimeTo , in the WindowRangeQuery  class, and subsequently update the methods within this class to accommodate this these new parameterparameters.
We also add a several new method public static <K, V> WindowRangeQuery<K, V> withWindowKeyRange(final K keyFrom,final K keyTo,final Instant timeFrom,final Instant timeTo)  withAllKey(), fromTime(), toTime(), withKeyRange() to this class.

Code Block
languagejava
titleWindowRangeQuery
public class WindowRangeQuery<K, V> implements Query<KeyValueIterator<Windowed<K>, V>> {
	...

       private  final  Optional<K>  keyTo;
    
    
	// newly added
	public static <K, V> WindowRangeQuery<K, V> withWindowKeyRangewithAllKey(final K keyFrom,) {
        return new WindowRangeQuery<>(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
    }

    @SuppressWarnings("unchecked")
    public <K, V> WindowRangeQuery<K, V> fromTime(final Instant timeFrom) {
        return (WindowRangeQuery<K, V>) new WindowRangeQuery<>(lower, upper, Optional.empty(), Optional.empty(), Optional.empty(), Optional.ofNullable(timeFrom), newTimeTo);
    }

 	// newly added
  final K keyTo, @SuppressWarnings("unchecked")
    public <K, V> WindowRangeQuery<K, V> toTime(final Instant timeTo) {
        return (WindowRangeQuery<K, V>) new WindowRangeQuery<>(lower, upper, Optional.empty(), Optional.empty(), Optional.empty(), newTimeFrom, Optional.ofNullable(timeTo));
    }
 	
	// newly added
    public static <K, V> WindowRangeQuery<K, V> withKeyRange(final K lower, final K upper) {
        return  final Instant timeFrom,
    new WindowRangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
    }

    // newly added
    public Optional<K> lowerKeyBound() {
        return lower;
    }
 
    // newly added
    public Optional<K> upperKeyBound() {
        return upper;
    }

  final Instant timeTo)@Deprecated

    public Optional<K>Optional<Instant> lowerKeyBoundgetTimeFrom() {
        return timeFrom;
    }

    @Deprecated
    public Optional<K>Optional<Instant> upperKeyBoundgetTimeTo() {
        return timeTo;
    }

	@Deprecated// newly added
    public Optional<Instant> getTimeFromtimeFrom() {
        return timeFrom;
    }
	
    // newly added
    public Optional<Instant> timeFromtimeTo() {
        return timeTo;
  	  }

    // newly @Deprecatedadded
    public Optional<Instant> getTimeTonewTimeFrom() {
        return newTimeFrom;
    }

 	    // newly added
    public Optional<Instant> timeTonewTimeTo() {
        return newTimeTo;
    @Override}
	
    // newly added
    public StringOptional<K> toStringkey() {
        return key;
    }
 }


Compatibility, Deprecation, and Migration Plan

...

Code Block
languagejava
titleshouldHandleWindowKeyQuery
 public <V> void shouldHandleWindowRangeQuery(
        final Integer keyFrom,
        final Integer keyTo,
        final Instant timeFrom,
        final Instant timeTo,
        final Function<V, Integer> valueExtactor,
        final Set<Integer> expectedValues) {

    	    final WindowRangeQuery<Integer, V> query = WindowRangeQuery.withWindowKeyRangewithKeyRange(keyFrom, keyTo, timeFrom, ).fromTime(timeFrom).toTime(timeTo);

        final StateQueryRequest<KeyValueIterator<Windowed<Integer>, V>> request =
            inStore(STORE_NAME)
                .withQuery(query)
                .withPartitions(mkSet(0, 1))
                .withPositionBound(PositionBound.at(INPUT_POSITION));

        final StateQueryResult<KeyValueIterator<Windowed<Integer>, V>> result =
            IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);

        if (result.getGlobalResult() != null) {
            fail("global tables aren't implemented");
        } else {
            final Set<Integer> actualValues = new HashSet<>();
            final Map<Integer, QueryResult<KeyValueIterator<Windowed<Integer>, V>>> queryResult = result.getPartitionResults();
            for (final int partition : queryResult.keySet()) {
                final boolean failure = queryResult.get(partition).isFailure();
                if (failure) {
                    throw new AssertionError(queryResult.toString());
                }
                assertThat(queryResult.get(partition).isSuccess(), is(true));

                assertThrows(
                    IllegalArgumentException.class,
                    queryResult.get(partition)::getFailureReason
                );
                assertThrows(
                    IllegalArgumentException.class,
                    queryResult.get(partition)::getFailureMessage
                );

                try (final KeyValueIterator<Windowed<Integer>, V> iterator = queryResult.get(partition).getResult()) {
                    while (iterator.hasNext()) {
                        actualValues.add(valueExtactor.apply(iterator.next().value));
                    }
                }
                assertThat(queryResult.get(partition).getExecutionInfo(), is(empty()));
            }
            assertThat("Result:" + result, actualValues, is(expectedValues));
            assertThat("Result:" + result, result.getPosition(), is(INPUT_POSITION));
        }
    }  

...