Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion Accepted

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

...

Utilizing these, the query returns results ordered by their keysare ordered based on the serialized byte[] of the keys, not the 'logical' key order.

Take IQv2StoreIntegrationTest as an example: we have two partitions with four key-value pairs:

...

Proposed Changes

According to KIP-969968, this KIP introduces the isKeyAscending variable public enum ResultOrder to determine whether keys are sorted in ascending or descending  or unordered order. It employs the withDescendingKeys() method to specify that the keys should Order is based on the serialized byte[] of the keys, not the 'logical' key order. employs the withDescendingKeys() and withAscendingKeys() methods to specify that the keys should be ordered in descending or ascending or unordered sequence, and the isKeyAscendingresultOrder() method to retrieve the value of isKeyAscendingenum value in  ResultOrder. I've incorporated these variables and methods into the RangeQuery class and modified some method inputs. As a result, we can now use withDescendingKeys() to obtain results in reverse order and use withAscendingKeys to obtain the result in ascending order.

Code Block
/**
 * Interactive query for issuing range queries and scans over KeyValue stores.
 * <p>
 *  A range query retrieves a set of records, specified using an upper and/or lower bound on the keys.
 * <p>
 * A scan query retrieves all records contained in the store.
 * <p>
 */
@Evolving
public final class RangeQuery<K, V> implements Query<KeyValueIterator<K, V>> {


    ...  

	/**
    private final* boolean isKeyAscending;

    private RangeQuery(final Optional<K> lower, final Optional<K> upper, final boolean isKeyAscending) {Determines if the serialized byte[] of the keys in ascending or descending or unordered order.
     * Order is this.lowerbased =on lower;
the serialized byte[] of the keys, not  this.upper = upper;the 'logical' key order.
     * @return return this.isKeyAscendingthe =order isKeyAscending;
of returned records based }

on the serialized  /**
     * Interactive range query using a lower and upper bound to filter the keys returnedbyte[] of the keys (can be unordered, or in ascending or in descending order).
     */
 @param lower The keypublic thatResultOrder specifies the lower bound of the rangeresultOrder() 

    /**
     * @paramSet upperthe Thequery keyto that specifiesreturn the upperserialized boundbyte[] of the rangekeys in descending order.
     * @paramOrder <K>is Thebased keyon type
the serialized byte[] of the *keys, @paramnot <V>the The'logical' valuekey typeorder.
     */ @return a new RangeQuery instance with descending flag set.
     */
 public static <K, V>public RangeQuery<K, V> withRange(final K lower, final K upper) {
withDescendingKeys() 

    /**
     * Set the query to return new RangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper), true);
    }

the serialized byte[] of the keys in ascending order.
     /**
 Order is based on the *serialized Determinesbyte[] ifof the query keys, are in ascendingnot the 'logical' key order.
     * @return true ifa new RangeQuery instance with ascending, falseflag otherwiseset.
     */
    public RangeQuery<K, booleanV> isKeyAscendingwithAscendingKeys() {
         return isKeyAscending;
    }

    /**
     * Set the query to return keys in descending order.
     * @return a new RangeQuery instance with descending flag set.
     */
    public RangeQuery<K, V> withDescendingKeys() {
        return new RangeQuery<>(this.lower, this.upper, false);
    }

    /**
     * Interactive range query using an upper bound to filter the keys returned.
     * If both <K,V> are null, RangQuery returns a full range scan.
     * @param upper The key that specifies the upper bound of the range
     * @param <K> The key type
     * @param <V> The value type
     */
    public static <K, V> RangeQuery<K, V> withUpperBound(final K upper) {
        return new RangeQuery<>(Optional.empty(), Optional.of(upper), true);
    }

    /**
     * Interactive range query using a lower bound to filter the keys returned.
     * @param lower The key that specifies the lower bound of the range
     * @param <K> The key type
     * @param <V> The value type
     */
    public static <K, V> RangeQuery<K, V> withLowerBound(final K lower) {
        return new RangeQuery<>(Optional.of(lower), Optional.empty(), true);
    }

    /**
     * Interactive scan query that returns all records in the store.
     * @param <K> The key type
     * @param <V> The value type
     */
    public static <K, V> RangeQuery<K, V> withNoBounds() {
        return new RangeQuery<>(Optional.empty(), Optional.empty(), true);
    }

    ...   ...
}

According to KIP-968, we introduce a public enum ResultOrder.

ResultOrder enum
It helps with specifying the order of the returned results by the query.

Code Block
languagejava
titleResultOrder
package org.apache.kafka.streams.query;
 
public enum ResultOrder {
    ANY,
    ASCENDING,
    DESCENDING
}


Test Plan

This time, our goal is to implement reverseRange and reverseAll functionalities. While these terms are used for clarity, in practice, they correspond to RangeQuery.withRange().withDescendingKeys() and RangeQuery.withNoBounds().withDescendingKeys(), respectively. To ensure the accurate retrieval of results for both functionalities, adjustments to IQv2StoreIntegrationTest are required. In our previous approach, we stored query results in a set, which doesn't maintain order. I've transitioned to using a list for storing query results, enabling us to distinguish between rangeQuery and reverseQuery. Here, rangeQuery refers to standard queries (those not using withDescendingKeys()) such as withRange(), withLowerBound(), withUpperBound(), and withNoBounds(). In contrast, reverseQuery denotes queries that employ the withDescendingKeys() method.

...