Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

A self-join is a join whose left and right-side arguments are the same entity (a stream or a table stream  reading from the same topic). Although self-joins are currently supported in Streams, their implementation is inefficient as they are implemented like regular joins where a state store is created for both left and right join arguments. Since both of these arguments represent the same entity, we don't need to create two state stores (as they will contain the exact same data) but only one. This optimization is only suitable for inner joins whose join condition is on the primary key. We do not consider foreign-key joins as we would need to create a state store for both arguments in order to be able to do efficient lookups. Hence, we will handle foreign-key self joins as regular inner foreign-key joins. Moreover, we do not consider outer joins since we are focusing on primary key joins and there will always be at least one join result, the current record joining with itself. 

One should consider this work as a logical plan optimization rule. If the graph contains a join whose arguments are the same entity, then we will rewrite it by applying the aforementioned self-join optimization that will create only one state store

This optimization only applies to Stream-Stream joins. Table-Table inner joins have both sides already materialized into state stores that the user created. Since this optimization aims to remove a state created specifically by Streams to perform the join, it is not applicable to table-table joins. Moreover, the optimization does not apply to N-way self-joins as this notion doesn't exist. Joins are binary operators and once a self-join has been applied, the result contains records whose columns are the concatenation of the columns of the left and right join arguments. A subsequent join will be applied on the result of the previous join and not on the inputs of the previous join which means it won't be a self-join anymore

Public Interfaces

No public interfaces will be impacted but the config TOPOLOGY_OPTIMIZATION_CONFIG will be used to enable or disable this optimization along with the other ones that it already controls. This introduces backwards compatibility issues. However, this is the case with any optimization happening under the umbrella of this config and the decision has been that users who have this config turned on should be prepared to build new topologies when upgrading. Another way to go about this is to introduce a config per optimization. 

...

The algorithm to identify whether two streams (or tables) represent the same entity will first check if the StreamSourceNode nodes that are the join arguments are reading from the same topic(s) and then, will do a traversal of the graph and will check if the path from the StreamSourceNode node to the root does not have nodes that can change the semantics of the stream. Initially, most (all) nodes will be disallowed to be on the path. Later, we can look into adding some smarts into the algorithm to identify whether a node actually changes the data in the stream or not.  If both conditions hold, then the join can be implemented as a self join. 

...

  1. The self-join is a physical plan optimization, it is not a different operator. We expose physical plan information to the user. 
  2. We put the burden of creating an optimal topology on the user as they need to know about the self-join operator to use it. If they don't, then their topologies will be inefficient. That's the beauty of having it as a rewriting, users will take advantage of it without even realizing it.
  3. Topologies may not be created by one user only and/or topologies may involve multiple operators that do all sorts of stuff. At some point in the code, the user has two variables that they join. They may not know that these refer to the same stream /table to realize that they can do a self-join instead of a regular join.

...