Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

A self-join is a join whose left and right-side arguments are the same entity (a stream or a table reading from the same topic). Although self-joins are currently supported in Streams, their implementation is inefficient as they are implemented like regular joins where a state store is created for both left and right join arguments. Since both these arguments represent the same entity, we don't need to create two state stores (as they will contain the exact same data) but only one.  This optimization is only suitable for inner joins whose join condition is on the primary key. We do not consider foreign-key joins as we would need to create a state store for both arguments in order to be able to do efficient lookups. Hence, we will handle foreign-key self joins as regular inner foreign-key joins. Moreover, we do not consider outer joins since we are focusing on primary key joins and there will always be at least one join result, the current record joining with itself. 

...

The cases we are going to support are :


true
Code Block
linenumbers
collapsetrue
stream1 = builder.stream(INPUT_TOPIC);
stream1.join(stream1);

and

Code Block
linenumberstrue
collapsetrue
stream1 = builder.stream(INPUT_TOPIC);
stream2 = builder.stream(INPUT_TOPIC);
stream1.join(stream2);

...

The algorithm to identify whether two streams (or tables) represent the same entity will first check if the StreamSourceNode nodes that are the join arguments are reading from the same topic(s) and then, will do a traversal of the graph and will check if the path from the StreamSourceNode node to the root does not have nodes that can change the semantics of the stream. Most nodes are actually Initially, most nodes will be disallowed to be on the path, only peek and print should be allowed.  If Later, we can look into adding some smarts into the algorithm to identify whether a mapper actually changes the data in the stream or not.  If both conditions hold, then the join can be implemented as a self join. This algorithm is future-proof to support the addition of new DSL operators without affecting the behavior of the self-join.

We will add the flag boolean isSelfJoin to StreamStreamJoinNode that will be used when building the topology. If ir is true, then instead of adding the 5 processors that are currently added for an inner join, we will add 2 processors TODO

...

Add to the DSL the operator selfJoin. We did not go with this approach as we prefer to push the complexity of the optimization to streams instead of to the user.

Pros:

  1. This will make backwards-compatibility an non-issue as a user that upgrades from an older version does not have access to this DSL. If they want to use a self-join, they have to manually make the change in their code.
  2. The code for a join can be chained like: builder.stream("topic").selfJoin().map(...)... If we don't have the operator, the code for a self-join needs to be broken into two parts: create the stream and get a reference, use the reference in a join like: stream.join(stream)
  3. Implementation is straight-forward as we don't need to implement an optimization rule to do the rewriting

Cons:

  1. The self-join is a physical plan optimization, it is not a different operator. We expose physical plan information to the user. 
  2. We put the burden of creating an optimal topology on the user as they need to know about the self-join operator to use it. If they don't, then their topologies will be inefficient. That's the beauty of having it as a rewriting, users will take advantage of it without even realizing it.
  3. Topologies may not be created by one user only and/or topologies may involve multiple operators that do all sorts of stuff. At some point in the code, the user has two variables that they join. They may not know that these refer to the same stream/table to realize that they can do a self-join instead of a regular join.


Another option that combines the benefits of both, is to implement both the DSL operator and the optimization rule. The downside of this approach is it could get confusing to the users as there would be multiple ways to achieve the same thing.