Table of Contents |
---|
Status
Current state: Under Discussion Accepted
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
...
Utilizing these, the query returns results ordered by their keysare ordered based on the serialized byte[] of the keys, not the 'logical' key order.
Take IQv2
StoreIntegrationTest
as an example: we have two partitions with four key-value pairs:
...
Proposed Changes
According to KIP-969968, this KIP introduces the isKeyAscending
variable public enum ResultOrder to determine whether keys are sorted in ascending or descending or unordered order. It employs the withDescendingKeys()
method to specify that the keys should Order is based on the serialized byte[] of the keys, not the 'logical' key order. employs the withDescendingKeys() and withAscendingKeys()
methods to specify that the keys should be ordered in descending or ascending or unordered sequence, and the isKeyAscendingresultOrder() method to retrieve the value of isKeyAscending
enum value in ResultOrder. I've incorporated these variables and methods into the RangeQuery
class and modified some method inputs. As a result, we can now use withDescendingKeys()
to obtain results in reverse order and use withAscendingKeys to obtain the result in ascending order.
Code Block |
---|
/** * Interactive query for issuing range queries and scans over KeyValue stores. * <p> * A range query retrieves a set of records, specified using an upper and/or lower bound on the keys. * <p> * A scan query retrieves all records contained in the store. * <p> */ @Evolving public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> { ... /** private final* boolean isKeyAscending; private RangeQuery(final Optional<K> lower, final Optional<K> upper, final boolean isKeyAscending) {Determines if the serialized byte[] of the keys in ascending or descending or unordered order. * Order is this.lowerbased =on lower; the serialized byte[] of the keys, not this.upper = upper;the 'logical' key order. * @return return this.isKeyAscendingthe =order isKeyAscending; of returned records based } on the serialized /** * Interactive range query using a lower and upper bound to filter the keys returnedbyte[] of the keys (can be unordered, or in ascending or in descending order). */ @param lower The keypublic thatResultOrder specifies the lower bound of the rangeresultOrder() /** * @paramSet upperthe Thequery keyto that specifiesreturn the upperserialized boundbyte[] of the rangekeys in descending order. * @paramOrder <K>is Thebased keyon type the serialized byte[] of the *keys, @paramnot <V>the The'logical' valuekey typeorder. */ @return a new RangeQuery instance with descending flag set. */ public static <K, V>public RangeQuery<K, V> withRange(final K lower, final K upper) { withDescendingKeys() /** * Set the query to return new RangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper), true); } the serialized byte[] of the keys in ascending order. /** Order is based on the *serialized Determinesbyte[] ifof the query keys, are in ascendingnot the 'logical' key order. * @return true ifa new RangeQuery instance with ascending, falseflag otherwiseset. */ public RangeQuery<K, booleanV> isKeyAscendingwithAscendingKeys() { return isKeyAscending; } /** * Set the query to return keys in descending order. * @return a new RangeQuery instance with descending flag set. */ public RangeQuery<K, V> withDescendingKeys() { return new RangeQuery<>(this.lower, this.upper, false); } /** * Interactive range query using an upper bound to filter the keys returned. * If both <K,V> are null, RangQuery returns a full range scan. * @param upper The key that specifies the upper bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> RangeQuery<K, V> withUpperBound(final K upper) { return new RangeQuery<>(Optional.empty(), Optional.of(upper), true); } /** * Interactive range query using a lower bound to filter the keys returned. * @param lower The key that specifies the lower bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> RangeQuery<K, V> withLowerBound(final K lower) { return new RangeQuery<>(Optional.of(lower), Optional.empty(), true); } /** * Interactive scan query that returns all records in the store. * @param <K> The key type * @param <V> The value type */ public static <K, V> RangeQuery<K, V> withNoBounds() { return new RangeQuery<>(Optional.empty(), Optional.empty(), true); } ... ... } |
According to KIP-968, we introduce a public enum ResultOrder.
ResultOrder enum
It helps with specifying the order of the returned results by the query.
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams.query;
public enum ResultOrder {
ANY,
ASCENDING,
DESCENDING
} |
Test Plan
This time, our goal is to implement reverseRange
and reverseAll
functionalities. While these terms are used for clarity, in practice, they correspond to RangeQuery.withRange().withDescendingKeys()
and RangeQuery.withNoBounds().withDescendingKeys()
, respectively. To ensure the accurate retrieval of results for both functionalities, adjustments to IQv2StoreIntegrationTest
are required. In our previous approach, we stored query results in a set, which doesn't maintain order. I've transitioned to using a list for storing query results, enabling us to distinguish between rangeQuery
and reverseQuery
. Here, rangeQuery
refers to standard queries (those not using withDescendingKeys()
) such as withRange()
, withLowerBound()
, withUpperBound()
, and withNoBounds()
. In contrast, reverseQuery
denotes queries that employ the withDescendingKeys()
method.
...