Preliminary Remarks

Stream

Each element in a stream is a key-value pair. A stream is partitioned by the key.

Join

Our JOIN operators are basically equi-joins on the keys of streams. Streams being joined must have the same key type. Also, they must be copartitioned.

KTable

KTable is a particular interpretation of a stream as a change log of a conceptual (or underlying) table. KTable's key is the primary key of the table. The corresponding topic is likely to be compaction enabled. 

KTable Aggregate

A KTable can be aggregated by a non-key field. The resulting stream is another KTable keyed by the aggregation key.

Windowed Aggregate

A stream can be aggregated by a window. Data within a window can be aggregated by key (ex. a page view count for each page id by day from a page view stream).

The aggregation key is the key of the stream. If a user want to aggregate the stream by non-key, he/she has to repartition the stream by using map().through() or map().to() before aggregation. 

The key of the result stream is the primary key of the produced stats, thus it is desired to be:

Note that the partitioning key is still the aggregation key. Although this breaks a basic assumption that the key is the partitioning key, this is advantageous in joining over the aggregation key, which may be more useful than joining over the combined key (aggregation key + window id). For example, a user may want to compare today's page views with the last week's. In this case join is performed on the aggregation key, however "keys" are different since the window ids do not match.

 As long as we contain this exception in our local operation, it should not break the system. For that reason a new distinct type of a stream, WTable, is introduced below.

WTable<K, V, W>

WTable cannot be directly written to a topic.

A user has to convert WTable to KStream with some transformation before persisting to a topic.

It is up to a user, but some reasonable conversions may be:

Instances of WTable are created only by the framework as the result of windowed aggregate. 

JOIN operators

Join Types

A join combines two streams. The first stream is called the primary stream, and the second stream is called the secondary stream.

primary-stream × secondary-stream

We have three types of joins.

Available join types depends on the types of join streams. 

primary \ secondaryKStreamKTableWTable
KStreamLOI LL ?
KTable LOIL ?
WTable LLOI

I: inner join, O: outer join, L: left join

†: windowed joins

?: any use case?

The result types of join are as follows.

primary \ secondaryKStreamKTableWTable
KStreamKStreamKStreamKStream
KTable KTableKTable
WTable WTableWTable

Join Processing

KTable-KTable Join

A join is performed when a record arrives at the join operator. The new record in one stream is matched with records in a materialized table of the other stream. All types of joins are driven by both streams.

When tables (KTable, WTable) are joined, the result is also a table. Let T be the change log, t be the materialization of T, and Function f: T → t be the materialization function.

If

t1 = f( T1 )
t2 = f( T2 )

then

innerJoin( t1, t2 ) = f( innerJoin( T1, T2 ) ) 
outerJoin( t1, t2 ) = f( outerJoin( T1, T2 ) )
leftJoin( t1, t2 ) = f( leftJoin( T1, T2 ) ).

Thus, in this sense, table-table joins are eventually consistent. This gives a kind of resilience to late arrival of records. A late arrival in either stream can "update" the join result.

KStream-KStream Join 

A join is performed when a record arrives at the join operator. The new record in one stream is matched with buffered records of the other stream. The inner join and the outer join are driven by both streams, thus both streams must have a buffer. On the other hand, unlike table-table left join, the stream-stream left join is driven only by the primary stream, so only the secondary stream is required to have a buffer. This style of processing affects the consistency of join results.

We will consider a single window of a windowed join. Let S be a stream, s be the set of records, and g: S → s such that s contains exactly all records in S.

If

s1 = g(S1)
s2 = g(S2)

then

innerJoin( s1, s2 ) = g( innerJoin( S1, S2) )

Thus, in this sense, inner join is eventually consistent. However, outer join and left join do not possess this property. There is no general way for a late arrival to "update" earlier result because a stream, unlike a table, does not have a primary key.

Output of Join

Let us consider the inner/outer join processing with hopping windows, where sequence of windows come and go. One record may belong to multiple windows. Suppose two records r1r2 , whose keys are same, are associated with sets windows (w1w2w3) and (w2w3w4), respectively. What should the join outputs look like?

Matching windows, windows which has both r1 and r2, are (w2w3). Do we want to emit an output record for each matching window? It will be confusing because the number of duplicate records depends on the number of matching windows of two records.

A window join is a join by a time difference. It makes sense to emit a single output record no matter how many matching windows inputs have. (If we follow this direction, it should not be a user's concern that the join use which type of windowing, like hopping window or sliding window. It is an implementation detail.)

Possible Implementations of Join window

Single RocksDB Instances for Each Stream
Multiple RocksDB Instances for each Stream

KStream-KTable Joins

A join is performed when a record from the primary stream arrives at the join operator. The new record in the primary stream is matched with records in a materialized table of the secondary stream. Only the left join is defined. Unlike table-table left join, the stream-table left join is driven only by the primary stream. This style of processing does not guarantee the consistency of join results.

Join API

Symbols

KTable-KTable Join

TK,V1 .join (TK,V2 , (V1, V2)→V3)

TK,V1 .outerJoin (TK,V2 , (V1, V2)→V3)

TK,V1 .leftJoin (TK,V2 , (V1, V2)→V3)

KStream-KTable Join

SK,V1 .leftJoin (TK,V2 , (V1, V2)→V3)

KStream-KStream Join with Windowing

SK,V1 .leftJoin (SK,V2 , (V1, V2)→V3 , WindowSpec)   --- any use case?

SK,V1 .join (SK,V2 , (V1, V2)→V3 , WindowSpec)

SK,V1 .outerJoin (SK,V2 , (V1, V2)→V3 , WindowSpec)

KStream-WTable Join

SK,V1 .leftJoin (WTK,V2,W , (K, V1)→W , (V1, V2)→V3)  ---  any use case?

KTable-WTable Join

TK,V1 .leftJoin (WTK,V2,W , (K, V1)→W , (V1, V2)→V3)  --- any use case?

WTable-KTable Join

WTK,V1,W .leftJoin (TK,V2 , (V1, V2)→V3)

WTable-WTable Join

WTK,V1,W .leftJoin (WTK,V2,W , (V1, V2) → V3)

WTK,V1,W .leftJoin (WTK,V2,W , W→W, (V1, V2) → V3)

WTK,V1,W .join (WTK,V2,W , (V1, V2) → V3)

WTK,V1,W .outerJoin (WTK,V2,W , (V1, V2) → V3)