Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

TopK is very commonly supported feature in SQL-like queries, and widely used in tech and non-tech industry alike. For example, a blog site owner would like to know the top 5 active authors posting blogs:

SELECT * FROM authors ORDER BY num_posts DESC LIMIT 5

similarly a professor wants to know the top 10 students in their final, a salesman wants to know best selling products, etc. These needs are important to be satisfied in traditional database. In the streaming SQL, this is also a valuable problem to be solved natively on Streams to natively support `ORDER BY` and `LIMIT` semantics on KSQL, instead of relying on customized implementations. 

...

Code Block
languagejava
titleKStream.java
KTable<KKTable<Integer, V>VO> top(RankerRanker<? super K, ? super V, P, VO>  ranker); 


Code Block
languagejava
titleKTable.java
KTable<KKTable<Integer, V>VO> top(RankerRanker<? super K, ? super V, P, VO> ranker); 


Where the Ranker is a generic interface to be implemented. It covers the following necessary definitions for topK operator:

  1. The ranking order (ascending | descending)
  2. The element limit
  3. The partition key (if needed)
  4. The output value type


Code Block
languagejava
titleRanker
/**
 * The helper instance to create the topK ranking based on streaming in events or table updates.
 * 
 * @param <K> input record key
 * @param <V> input record value
 * @param <VO> the output value type
 * @param <P> input partition key
 */
public interface Ranker<K, V, VO, P> {

    enum Order {
      ASCENDING,
      DESCENDING
    }

    /**
     * The order of ranking, either ascending or descending.
     */
    Order order();

    /**
     * The number of retained elements in given order.
     *
     * @return the total number of top records to be preserved.
     */
    int limit();

    /**
     * Compare the two key-value pairs rank.
     *
     * @param key1       the key of the first record
     * @param value1     the value of the first record
     * @param key2       the key of the second record
     * @param value2     the value of the second record
     *
     * @return the comparison result, 0 means equal, negative means record 1 < record 2, positive means record 1 > record 2
     */
    int compare(final K key1, final V value1, final K key2, final V value2);

    /**
     * Get the partition key for different ranking categories
     *
     * @param key key of the record
     * @param value value of the record
     * @return the partition key
     */
    P partition(final K key, final V value);
 
    /**
     * The output value type to be saved in the top ranking result.
     * 
     * @param key record key
     * @param value record value
     * @return output value
     */
    VO output(final K key, final V value);
}

The output flow here is an ever-updating KTable with key as the ranking number and value as the user-defined type. Thus we could support very complex query on the Ksql layer as:


SELECT * FROM (

SELECT *, ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num

FROM ShopSales)

WHERE row_num <= 5

Proposed Changes

We are expecting to roll out this change in 3 phases. 

Core implementation

Optimization

Basic In-memory Ranking Store Support

We would start from using an in-memory structure to maintain the ever changing rank as a single node.

Optimizations

two-level

Extension

support Windowed table semanticsExtension


To breakdown, the TopK operator is aiming at globally sort all the emitted key-value pairs from upstream operator, and only retain the top N values within its state as a KTable. We also see cases where the TopK query is partitioned, such as a salesperson wants to know the best sellers in different categories. Considering the nature of ever updating streams, it is also necessary to support windowed TopK as well.

...