Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

TopK is a very commonly supported feature in SQL-like queries, and widely used in tech and non-tech industry alike. For example, a blog site owner would like to know the top 5 active authors posting blogs:

...

To breakdown, the TopK operator is aiming at globally sort all the emitted key-value pairs from upstream operator, and only retain the top N K values within its state as a KTabletable. We also see cases where the TopK query is partitioned, such as a salesperson wants to know the best sellers in different categories. Considering the nature of ever updating streams, it is also necessary to support windowed TopK as well.

Public Interfaces

We are proposing a new operator called top() on both KStream and KTable interface to on KTable interface to create a materialized view for global ranking of data records in either ascending or descending order, with only limited number of withholding elements:

Code Block
languagejava
titleKTable.java
<P, VO> KTable<P, Map<Integer, VO>> top(Ranker<? super K, ? super V, P, VO> ranker); 

The reason for only supporting KTable operator is that the top() operator does not serve any aggregation purpose, so its comparison is purely record based. For a KStream instance, it is advised to do the aggregation first to create a changing agg result as KTable. 


The Where the Ranker is a generic interface to be implemented. It covers the following necessary definitions for topK operator:

  1. The ranking order (ascending | descending)
  2. The element limit
  3. The partition key (if needed)
  4. The output value type

...

Code Block
languagejava
titleRanker
/**
 * The helper instance to create the topK ranking based on streaming in events or table updates.
 * 
 * @param <K> input record key
 * @param <V> input record value
 * @param <VO><P> theinput outputpartition value typekey
 * @param <P><VO> the inputoutput partitionvalue keytype
 */
public interface Ranker<K, V, VOP, P> VO> {

    enum Order {
      ASCENDING,
      DESCENDING
    }

    /**
     * The order of ranking, either ascending or descending.
     */
    Order order();

    /**
     * The number of retained elements in given order.
     *
     * @return the total number of top records to be preserved.
     */
    int limit();

    /**
     * Compare the two key-value pairs rank.
     *
     * @param key1       the key of the first record
     * @param value1     the value of the first record
     * @param key2       the key of the second record
     * @param value2     the value of the second record
     *
     * @return the comparison result, 0 means equal, negative means record 1 < record 2, positive means record 1 > record 2
     */
    int compare(final K key1, final V value1, final K key2, final V value2);

    /**
     * Get the partition key for different ranking categories
     *
     * @param key key of the record
     * @param value value of the record
     * @return the partition key
     */
    P partition(final K key, final V value);
 
    /**
     * The output value type to be saved in the top ranking result.
     * 
     * @param key record key
     * @param value record value
     * @return output value
     */
    VO output(final K key, final V value);
}

The output flow here is an ever-updating KTable with key as the ranking number and value as the user-defined type. Thus we could support very complex query on the Ksql layer as:

Code Block
SELECT * FROM (

...



SELECT *, ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num

...



FROM ShopSales)

...



WHERE row_num <= 5


And a sample streaming query to compute the most recent 20 logged-in users would be look like:

Code Block
languagejava
titleSampleStream.java
final StreamsBuilder builder = new StreamsBuilder();

final KTable<String, UserProfile> users = builder.table("streams-userprofile-input", Consumed.with(Serdes.String(), new JSONSerde<>()));

final KTable<String, UserAggStats> userActivities = users.groupByKey(...).aggregate(...); // get the aggregation result of user comments, blog posts, etc.

userActivities.top(new Ranker<String, UserAggStats, String, UserProfile>() {
    @Override
    public Order order() {
        return Order.ASCENDING;
    }

    @Override
    public int limit() {
        return 20;
    }

    @Override
    public int compare(String key1, UserAggStats stats1, String key2, UserAggStats stats2) {
        return new Long(stats1.numComments - stats2.numComments).intValue();
    }

    @Override
    public String partition(String key, UserAggStats stat) {
        return stat.profile().region; // Partition users by region
    }

    @Override
    public UserProfile output(String key, UserAggStats stat) {
        return stat.profile();
    }
});

Proposed Changes

We are expecting to roll out this change in 3 phases. This KIP will focus on phase 1 and 2 design.

...