Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This optimization only applies to Stream-Stream inner joins. A Table-Table inner self-join would not yield interesting results since a row would always join with itself. Moreover, the this optimization does not apply to N-way self-joins as this notion doesn't exist. Joins are binary operators and once a self-join has been applied, the result in the current implementation, an inner join is a binary operator. What this entails is that first one self-join will be applied whose results contains records whose columns are the concatenation of the columns of the left and right join arguments. A The subsequent join will would then be applied on the result of the previous first join (the concatenated rows) and not on the inputs of the previous join which means it won't be a self-join anymore. In order to optimize N-way self-joins, we would need to implement a new n-ary operator which is out of the scope of this KIP.

Public Interfaces

No public interfaces will be impacted but . We will introduce the config TOPOLOGYSELF_JOIN_OPTIMIZATION_CONFIG will be used to enable or disable this optimization along with the other ones that it already controls. This introduces backwards compatibility issues. However, this is the case with any optimization happening under the umbrella of this config and the decision has been that users who have this config turned on should be prepared to build new topologies when upgrading. Another way to go about this is to introduce a config per optimization. that enables/disables the optimization. It will only take effect if the config TOPOLOGY_OPTIMIZATION_CONFIG is enabled.

Proposed Changes

The changes required to implement this proposal are:

...

The algorithm to identify whether two streams represent the same entity will first check if the StreamSourceNode nodes that are the join arguments are reading from the same topic(s) and then, will do a traversal of the graph and will check if the path from the StreamSourceNode node to the root does not have nodes that can change the semantics of the stream. Initially, most (all) nodes will be disallowed to be on the path. Later, we can look into adding some smarts into the algorithm to identify whether a node actually changes the data in the stream or not.  If both that:

  1. There is a single StreamSourceNode node in the graph
  2. The StreamSourceNode node has a single parent
  3. There are no other nodes besides the KStreamJoinWindow node that are siblings of the StreamStreamJoinNode and have smaller build priority

If all these conditions hold, then the join can be implemented optimized as a self join. 

  • Implement the self-join operator 

The self-join operator will maintain one state store. For every new record, the record will first get added to the state store and then the operator will perform a lookup into the same state store to do the actual join. This means that every record will at least join with itself.

...