...
TopK is very commonly supported feature in SQL-like queries, and widely used in tech and non-tech industry alike. For example, a blog site owner would like to know the top 5 active authors posting blogs:
SELECT * FROM authors ORDER BY num_posts DESC LIMIT 5
similarly a professor wants to know the top 10 students in their final, a salesman wants to know best selling products, etc. These needs are important to be satisfied in traditional database. In the streaming SQL, this is also a valuable problem to be solved natively on Streams to natively support `ORDER BY` and `LIMIT` semantics on KSQL, instead of relying on customized implementations.
...
Code Block | ||||
---|---|---|---|---|
| ||||
KTable<KKTable<Integer, V>VO> top(RankerRanker<? super K, ? super V, P, VO> ranker); |
Code Block | ||||
---|---|---|---|---|
| ||||
KTable<KKTable<Integer, V>VO> top(RankerRanker<? super K, ? super V, P, VO> ranker); |
Where the Ranker is a generic interface to be implemented: . It covers the following necessary definitions for topK operator:
- The ranking order (ascending | descending)
- The element limit
- The partition key (if needed)
- The output value type
Code Block | ||||
---|---|---|---|---|
| ||||
/** * The helper instance to create the topK ranking based on streaming in events or table updates. * * @param <K> input record key * @param <V> input record value * @param <VO> the output value type * @param <P> input partition key */ public interface Ranker<K, V, VO, P> { enum Order { ASCENDING, DESCENDING } /** * The order of ranking, either ascending or descending. */ Order order(); /** * The number of retained elements in given order. * * @return the total number of top records to be preserved. */ int limit(); /** * Compare the two key-value pairs rank. * * @param key1 the key of the first record * @param value1 the value of the first record * @param key2 the key of the second record * @param value2 the value of the second record * * @return the comparison result, 0 means equal, negative means record 1 < record 2, positive means record 1 > record 2 */ int compare(final K key1, final V value1, final K key2, final V value2); /** * Get the partition key for different ranking categories * * @param key key of the record * @param value value of the record * @return the partition key */ P partition(final K key, final V value); /** * The output value type to be saved in the top ranking result. * * @param key record key * @param value record value * @return output value */ VO output(final K key, final V value); } |
The output flow here is an ever-updating KTable with key as the ranking number and value as the user-defined type. Thus we could support very complex query on the Ksql layer as:
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num
FROM ShopSales)
WHERE row_num <= 5
Proposed Changes
We are expecting to roll out this change in 3 phases.
Core implementation
Optimization
Basic In-memory Ranking Store Support
We would start from using an in-memory structure to maintain the ever changing rank as a single node.
Optimizations
two-level
Extension
support Windowed table semanticsExtension
To breakdown, the TopK operator is aiming at globally sort all the emitted key-value pairs from upstream operator, and only retain the top N values within its state as a KTable. We also see cases where the TopK query is partitioned, such as a salesperson wants to know the best sellers in different categories. Considering the nature of ever updating streams, it is also necessary to support windowed TopK as well.
...