...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
TopN TopK is very commonly supported feature in SQL-like queries, and widely used in tech and non-tech industry alike. For example, a blog site owner would like to know the top 5 active authors posting blogs:
...
similarly a professor wants to know the top 10 students in their final, a salesman wants to know best selling products, etc. These needs are important to be satisfied in traditional database. In the streaming SQL, this is also a valuable problem to be solved natively on Streams to natively support `ORDER BY` and `LIMIT` semantics on KSQL, instead of relying on customized implementations.
Public Interfaces
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
A public interface is any change to the following:
Binary log format
The network protocol and api behavior
Any class in the public packages under clientsConfiguration, especially client configuration
org/apache/kafka/common/serialization
org/apache/kafka/common
org/apache/kafka/common/errors
org/apache/kafka/clients/producer
org/apache/kafka/clients/consumer (eventually, once stable)
Monitoring
Command line tools and arguments
- Anything else that will likely break existing users in some way when they upgrade
Proposed Changes
We are proposing a new operator called top() on both KStream and KTable interface to create a materialized view for global ranking of data records in either ascending or descending order, with only limited number of withholding elements:
Code Block | ||||
---|---|---|---|---|
| ||||
KTable<K, V> top(Ranker ranker); |
Code Block | ||||
---|---|---|---|---|
| ||||
KTable<K, V> top(Ranker ranker); |
Where the Ranker is a generic interface to be implemented:
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* The helper instance to create the topK ranking based on streaming in events or table updates.
*
* @param <K> input record key
* @param <V> input record value
* @param <P> input partition key
*/
public interface Ranker<K, V, P> {
enum Order {
ASCENDING,
DESCENDING
}
/**
* The order of ranking, either ascending or descending.
*/
Order order();
/**
* The number of retained elements in given order.
*
* @return the total number of top records to be preserved.
*/
int limit();
/**
* Compare the two key-value pairs rank.
*
* @param key1 the key of the first record
* @param value1 the value of the first record
* @param key2 the key of the second record
* @param value2 the value of the second record
*
* @return the comparison result, 0 means equal, negative means record 1 < record 2, positive means record 1 > record 2
*/
int compare(final K key1, final V value1, final K key2, final V value2);
/**
* Get the partition key for different ranking categories
*
* @param key key of the record
* @param value value of the record
* @return the partition key
*/
P partition(final K key, final V value);
} |
Proposed Changes
We are expecting to roll out this change in 3 phases.
Core implementation
Optimization
Extension
To breakdown, the TopK To breakdown, the TopN operator is aiming at globally sort all the emitted key-value pairs from upstream operator, and only retain the top N values within its state as a KTable. We also see cases where the TopN TopK query is partitioned, such as a salesperson wants to know the best sellers in different categories. Considering the nature of ever updating streams, it is also necessary to support windowed TopN TopK as well.
Compatibility, Deprecation, and Migration Plan
...