Table of Contents |
---|
Status
Current state: Under Discussion Accepted
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
Voting thread: https://lists.apache.org/thread/xxyb5yyqrsdxsyxxbjhvnlxw5fl8xd0c
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Update an implementation of the Query interface, introduced in KIP-796: Interactive Query v2 , to support The concepts of reverseRange
and reverseAll
.
Use bounded query to achieve reverseRange and use unbounded query to achieve reverseAll.
Proposed Changes
are not tied to any specific method or class. Rather, they represent functionalities we wish to achieve. Currently, with RangeQuery
, we can use methods like withRange()
, withLowerBound()
, withUpperBound()
, and withNoBounds()
.
Utilizing these, the query results are ordered based on the serialized byte[] of the keys, not the 'logical' key order.
Take IQv2
StoreIntegrationTest
as an example: we have two partitions with four key-value pairs:
<0,0>
in Partition0<1,1>
in Partition1<2,2>
in Partition0<3,3>
in Partition1
When we use RangeQuery.withRange(1,3)
, the returned result is:
- Partition0:
[2]
- Partition1:
[1, 3]
To achieve the functionalities of To achieve reverseRange
and reverseAll
, we can generate a new class like below in Rejected Alternatives part, we can also reuse lots of code from RangeQuery
, to simplify the code we choose reuse the RangeQuery
code.
We add a variable reverse
in RangeQuery
, the default value is false, we want do reverseRange or reverseAll, we can set it to true.
Then we generate two public methods:
The first one is isReverse()
, if this method return true, do reverseQuery otherwise do rangeQuery
the second method is setReverse()
, if we want the query do reverseQuery, we can use this method to set the reverse
to true, so this time, rangeQuery Stand for reverseQuery.
Use bounded reverseQuery to achieve reverseRange and use unbounded reverseQuery to achieve reverseAll.
introduce a method named withDescendingKeys()
for reversed queries. For example, by using RangeQuery.withRange(1,3).withDescendingKeys()
, the expected result would be:
- Partition0:
[2]
- Partition1:
[3, 1]
This means the results are in the reverse order of their keys.
To ensure that we can achieve this functionality, the keys in both RocksDB
and InMemoryKeyValueStore
should be sorted. We know that RocksDB
keys are inherently sorted. After investigation, we found that InMemoryKeyValueStore
uses a TreeMap
, implying its keys are also sorted. Therefore, performing the aforementioned queries is feasible.
Proposed Changes
According to KIP-968, this KIP introduces the public enum ResultOrder to determine whether keys are sorted in ascending or descending or unordered order. Order is based on the serialized byte[] of the keys, not the 'logical' key order. employs the withDescendingKeys() and withAscendingKeys()
methods to specify that the keys should be ordered in descending or ascending or unordered sequence, and the resultOrder() method to retrieve the value of enum value in ResultOrder. I've incorporated these variables and methods into the RangeQuery
class and modified some method inputs. As a result, we can now use withDescendingKeys()
to obtain results in reverse order and use withAscendingKeys to obtain the result in ascending order.
Code Block | ||||
---|---|---|---|---|
Code Block | ||||
| ||||
/** * Interactive query for issuing range queries and scans over KeyValue stores. * <p> * A range query retrieves a set of records, specified using an upper and/or lower bound on the keys. * <p> * A scan query retrieves all records contained in the store. * <p> * If the reverse is false, do rangeQuery. If the reverse is true do reverseQuery */ @Evolving public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> { ... private boolean reverse; / @Evolving public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> { ... /** * Determines if the serialized byte[] of the keys in ascending or descending or unordered order. * Order is based on the serialized byte[] of the keys, not the 'logical' key order. * @return return the order of returned records based on the serialized byte[] of the keys (can be unordered, or in ascending or in descending order). */ public ResultOrder resultOrder() /** * Check whether the Query is reverseQuery Set the query to return the serialized byte[] of the keys in descending order. * Order is based on the serialized byte[] of the keys, not the 'logical' key order. * @return a new RangeQuery instance with descending flag set. */ public RangeQuery<K, booleanV> isReversewithDescendingKeys() { /** * Set the query to return reverse; } the serialized byte[] of the keys in ascending order. /*** Order is based on the serialized byte[] of the keys, not the 'logical' key order. * Set the Query to reverseQuery @return a new RangeQuery instance with ascending flag set. */ public RangeQuery<K, voidV> withDescandingOrder() { this.reverse = true;withAscendingKeys() ... } |
According to KIP-968, we introduce a public enum ResultOrder.
ResultOrder enum
It helps with specifying the order of the returned results by the query.
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams.query; public enum ResultOrder { ANY, ASCENDING, } ...DESCENDING } |
Test Plan
This time, we aim our goal is to implement reverseRange
and reverseAll
functionalities. While these terms are used for clarity, in practice, they correspond to RangeQuery.withRange().withDescendingKeys()
and RangeQuery.withNoBounds().withDescendingKeys()
, respectively. To ensure the accuracy accurate retrieval of the results for both functionalities, modifications adjustments to IQv2StoreIntegrationTest
are necessary. Previouslyrequired. In our previous approach, we stored query results in a set, which doesn't retain maintain order. I've since switched transitioned to using a list to store the query results. This allows us to discern the differences between rangeQuery and reverseQuery.for storing query results, enabling us to distinguish between rangeQuery
and reverseQuery
. Here, rangeQuery
refers to standard queries (those not using withDescendingKeys()
) such as withRange()
, withLowerBound()
, withUpperBound()
, and withNoBounds()
. In contrast, reverseQuery
denotes queries that employ the withDescendingKeys()
method.
We've transitioned the expectedValue
from a Set
to a List
and arranged the partition numbers in order. This organization assists us in predicting the results. If the partition numbers were random, predicting the outcome would be challenging. Ultimately, this enables us to obtain and store the answer in the expectedValue
. Consequently, the results between rangeQuery
and reverseQuery
will differ.
Code Block | ||||
---|---|---|---|---|
| ||||
public class IQv2StoreIntegrationTest {
...
@SuppressWarnings("unchecked")
public <V> void shouldHandleRangeQuery(
final Optional<Integer> lower,
final Optional<Integer> upper,
final boolean isKeyAscending,
final Function<V, Integer> valueExtactor,
final List<Integer> expectedValue) {
final RangeQuery<Integer, V> query;
if (isKeyAscending) {
query = RangeQuery.withRange(lower.orElse(null), upper.orElse(null));
} else {
query = (RangeQuery<Integer, V>) RangeQuery.withRange(lower.orElse(null), upper.orElse(null)).withDescendingKeys();
}
...
} else {
final List<Integer> actualValue = new ArrayList<>();
...
final List<Integer> partitions = new ArrayList<>(queryResult.keySet());
partitions.sort(null);
for (final int partition : partitions) {
...
}
...
}
|
Compatibility, Deprecation, and Migration Plan
- Because we have already have
RangeQuery
Utilizing the existingRangeQuery
class, so we can update make some code to achieve reverseRange and reverseAllmodifications to realize the concepts ofreverseRange
andreverseAll
. To reiterate,reverseRange
andreverseAll
are not classes or methods but merely concepts. - Since nothing is deprecated in this KIP, users have no need to migrate unless they want to.
Rejected Alternatives
At first, we planned to implement ReverseRangeQuery
from scratch. However, after discussions, we decided to reuse some of the RangeQuery
code, allowing RangeQuery
to possess the reverseQuery functionality. Therefore, in the end, we chose not to add a new ReverseRangeQuery
class but instead to enhance the RangeQuery
with new featuresAfter initial plans to create a ReverseRangeQuery
from the ground up, we opted to leverage existing code from the RangeQuery
class following further discussions.
Code Block | ||||
---|---|---|---|---|
| ||||
@Evolving public final class ReverseRangeQuery<K, V> implements Query<KeyValueIterator<K, V>> { private final Optional<K> lower; private final Optional<K> upper; private ReverseRangeQuery(final Optional<K> lower, final Optional<K> upper) { this.lower = lower; this.upper = upper; } /** * Interactive range query using a lower and upper bound to filter the keys returned. * @param lower The key that specifies the lower bound of the range * @param upper The key that specifies the upper bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> ReverseRangeQuery<K, V> withRange(final K lower, final K upper) { return new ReverseRangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper)); } /** * Interactive range query using an upper bound to filter the keys returned. * If both <K,V> are null, RangQuery returns a full range scan. * @param upper The key that specifies the upper bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> ReverseRangeQuery<K, V> withUpperBound(final K upper) { return new ReverseRangeQuery<>(Optional.empty(), Optional.of(upper)); } /** * Interactive range query using a lower bound to filter the keys returned. * @param lower The key that specifies the lower bound of the range * @param <K> The key type * @param <V> The value type */ public static <K, V> ReverseRangeQuery<K, V> withLowerBound(final K lower) { return new ReverseRangeQuery<>(Optional.of(lower), Optional.empty()); } /** * Interactive scan query that returns all records in the store. * @param <K> The key type * @param <V> The value type */ public static <K, V> ReverseRangeQuery<K, V> withNoBounds() { return new ReverseRangeQuery<>(Optional.empty(), Optional.empty()); } /** * The lower bound of the query, if specified. */ public Optional<K> getLowerBound() { return lower; } /** * The upper bound of the query, if specified */ public Optional<K> getUpperBound() { return upper; } } |
...