You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Status

Current state[One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

TopK is very commonly supported feature in SQL-like queries, and widely used in tech and non-tech industry alike. For example, a blog site owner would like to know the top 5 active authors posting blogs:

SELECT * FROM authors ORDER BY num_posts DESC LIMIT 5

similarly a professor wants to know the top 10 students in their final, a salesman wants to know best selling products, etc. These needs are important to be satisfied in traditional database. In the streaming SQL, this is also a valuable problem to be solved natively on Streams to natively support `ORDER BY` and `LIMIT` semantics on KSQL, instead of relying on customized implementations. 

Public Interfaces

We are proposing a new operator called top() on both KStream and KTable interface to create a materialized view for global ranking of data records in either ascending or descending order, with only limited number of withholding elements:

KStream.java
KTable<K, V> top(Ranker ranker); 
KTable.java
KTable<K, V> top(Ranker ranker); 


Where the Ranker is a generic interface to be implemented: 

Ranker
/**
 * The helper instance to create the topK ranking based on streaming in events or table updates.
 * 
 * @param <K> input record key
 * @param <V> input record value
 * @param <P> input partition key
 */
public interface Ranker<K, V, P> {

    enum Order {
      ASCENDING,
      DESCENDING
    }

    /**
     * The order of ranking, either ascending or descending.
     */
    Order order();

    /**
     * The number of retained elements in given order.
     *
     * @return the total number of top records to be preserved.
     */
    int limit();

    /**
     * Compare the two key-value pairs rank.
     *
     * @param key1       the key of the first record
     * @param value1     the value of the first record
     * @param key2       the key of the second record
     * @param value2     the value of the second record
     *
     * @return the comparison result, 0 means equal, negative means record 1 < record 2, positive means record 1 > record 2
     */
    int compare(final K key1, final V value1, final K key2, final V value2);

    /**
     * Get the partition key for different ranking categories
     *
     * @param key key of the record
     * @param value value of the record
     * @return the partition key
     */
    P partition(final K key, final V value);
}

Proposed Changes

We are expecting to roll out this change in 3 phases. 

Core implementation

Optimization

Extension


To breakdown, the TopK operator is aiming at globally sort all the emitted key-value pairs from upstream operator, and only retain the top N values within its state as a KTable. We also see cases where the TopK query is partitioned, such as a salesperson wants to know the best sellers in different categories. Considering the nature of ever updating streams, it is also necessary to support windowed TopK as well.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels