Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

similarly a professor wants to know the top 10 students in their final, a salesman wants to know best selling products, etc. These needs are important to be satisfied in traditional database. In the streaming SQL, this is also a valuable problem to be solved natively on Streams to natively support `ORDER BY` and `LIMIT` semantics on KSQL, instead of relying on customized implementations. 

To breakdown, the TopK operator is aiming at globally sort all the emitted key-value pairs from upstream operator, and only retain the top N values within its state as a KTable. We also see cases where the TopK query is partitioned, such as a salesperson wants to know the best sellers in different categories. Considering the nature of ever updating streams, it is also necessary to support windowed TopK as well.

Public Interfaces

We are proposing a new operator called top() on both KStream and KTable interface to create a materialized view for global ranking of data records in either ascending or descending order, with only limited number of withholding elements:

Code Block
languagejava
titleKStreamKTable.java
KTable<IntegerKTable<P, VO> top(Ranker<? super K, ? super V, P, VO>  ranker); 
Code Block
languagejava
titleKTable.java
KTable<Integer, VO> Map<Integer, VO>> top(Ranker<? super K, ? super V, P, VO> ranker); 

...

We are expecting to roll out this change in 3 phases.  This KIP will focus on phase 1 and 2 design.

Basic In-memory Ranking Store Support (Phase 1)

We would start from using an in-memory structure to maintain the ever changing rank as a single node.

Optimizations

two-level

Extension

support Windowed table semantics

To breakdown, the TopK operator is aiming at globally sort all the emitted key-value pairs from upstream operator, and only retain the top N values within its state as a KTable. We also see cases where the TopK query is partitioned, such as a salesperson wants to know the best sellers in different categories. Considering the nature of ever updating streams, it is also necessary to support windowed TopK as well.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

The operator will serve as a special task which subscribes all upstream partitions and compute the result as a mapping from user defined partition towards the top ranked values. The architecture will look like:

draw.io Diagram
bordertrue
diagramNameTopK operator
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth892
revision1


Optimizations (Phase 2)

Two-level TopK

When the number of unique key grows, it is no longer trivial to maintain a single node architecture to compute all the incoming changes on the fly. Thus we propose to adopt a similar strategy from Flink as two-level TopK: first on each upstream partition, there would be a first level sorting and topK values computation, and then gets sent to the intermediate repartitioned topic, which will be processed by a separate second level TopK operator. This will greatly reduce the amount of work on the single node as the pre-filtering is working.

Incremental Update

To reduce the amount of data transmitted through the wire, we could optionally choose to do incremental updates of the ranking instead of a full update to push to the downstream. This means that we maintain the same output format as Map<Integer, VOut>, but do the updates in the meantime as well. The approach is to add an API to the Ranker to define whether the given operator sends partial or full updates. 

Extension (Phase 3)

Windowed table is a special type of KTable where its key is windowed. The current proposal hasn't touched the complicated nesting logic between windowed table and topK definition. In the perspective of the end users, a topK operator connecting with windowed table should be defined as individual rank order for each time window with retention. The data state volume will be really huge with many separate rankings based on time window. We decide to postpone the design for windowed table upstream before we collect enough supporting user stories and industry use cases to properly define its semantic. By then, the design for window scalability is making sense.


Compatibility, Deprecation, and Migration Plan

N/A

Rejected Alternatives

N/AIf there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.