Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

TopN TopK is very commonly supported feature in SQL-like queries, and widely used in tech and non-tech industry alike. For example, a blog site owner would like to know the top 5 active authors posting blogs:

...

similarly a professor wants to know the top 10 students in their final, a salesman wants to know best selling products, etc. These needs are important to be satisfied in traditional database. In the streaming SQL, this is also a valuable problem to be solved natively on Streams to natively support `ORDER BY` and `LIMIT` semantics on KSQL, instead of relying on customized implementations. 

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

We are proposing a new operator called top() on both KStream and KTable interface to create a materialized view for global ranking of data records in either ascending or descending order, with only limited number of withholding elements:


Code Block
languagejava
titleKStream.java
KTable<K, V> top(Ranker ranker); 


Code Block
languagejava
titleKTable.java
KTable<K, V> top(Ranker ranker); 


Where the Ranker is a generic interface to be implemented: 


Code Block
languagejava
titleRanker
/**
 * The helper instance to create the topK ranking based on streaming in events or table updates.
 * 
 * @param <K> input record key
 * @param <V> input record value
 * @param <P> input partition key
 */
public interface Ranker<K, V, P> {

    enum Order {
      ASCENDING,
      DESCENDING
    }

    /**
     * The order of ranking, either ascending or descending.
     */
    Order order();

    /**
     * The number of retained elements in given order.
     *
     * @return the total number of top records to be preserved.
     */
    int limit();

    /**
     * Compare the two key-value pairs rank.
     *
     * @param key1       the key of the first record
     * @param value1     the value of the first record
     * @param key2       the key of the second record
     * @param value2     the value of the second record
     *
     * @return the comparison result, 0 means equal, negative means record 1 < record 2, positive means record 1 > record 2
     */
    int compare(final K key1, final V value1, final K key2, final V value2);

    /**
     * Get the partition key for different ranking categories
     *
     * @param key key of the record
     * @param value value of the record
     * @return the partition key
     */
    P partition(final K key, final V value);
}

Proposed Changes

We are expecting to roll out this change in 3 phases. 

Core implementation

Optimization

Extension


To breakdown, the TopK To breakdown, the TopN operator is aiming at globally sort all the emitted key-value pairs from upstream operator, and only retain the top N values within its state as a KTable. We also see cases where the TopN TopK query is partitioned, such as a salesperson wants to know the best sellers in different categories. Considering the nature of ever updating streams, it is also necessary to support windowed TopN TopK as well.

Compatibility, Deprecation, and Migration Plan

...