...
The self-join operator will maintain one state store. For every new record, the record will first get added operator will add it to the state store and then the operator will use it to perform a lookup into the same state store to do the actual join. This means that every record will at least join with itself.
- Add a rule to the optimizer that will rewrite an inner join to a self-join. The graph graphs (logical planplans) created from the DSL excerpts above are the same (after the optimization
mergeDuplicateSourceNodes
) and look as follows:
...
Compatibility, Deprecation, and Migration Plan
...
The change is backward compatible since:
- It reuses existing topics/state stores
- It does not change the names of existing topics/state stores
- It removes from the topology the right-side state store but this state store is kept around in case users want to roll back.
- It does not change the internal naming of processors or graph nodes
Here is an example topology of an inner join and how it gets rewritten:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [topic2])
--> KSTREAM-WINDOWED-0000000001, KSTREAM-WINDOWED-0000000002
Processor: KSTREAM-WINDOWED-0000000001 (stores: [KSTREAM-JOINTHIS-0000000003-store])
--> KSTREAM-JOINTHIS-0000000003
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINOTHER-0000000004-store])
--> KSTREAM-JOINOTHER-0000000004
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-JOINOTHER-0000000004 (stores: [KSTREAM-JOINTHIS-0000000003-store])
--> KSTREAM-MERGE-0000000005
<-- KSTREAM-WINDOWED-0000000002
Processor: KSTREAM-JOINTHIS-0000000003 (stores: [KSTREAM-JOINOTHER-0000000004-store])
--> KSTREAM-MERGE-0000000005
<-- KSTREAM-WINDOWED-0000000001
Processor: KSTREAM-MERGE-0000000005 (stores: [])
--> KSTREAM-PROCESSOR-0000000006
<-- KSTREAM-JOINTHIS-0000000003, KSTREAM-JOINOTHER-0000000004
Processor: KSTREAM-PROCESSOR-0000000006 (stores: [])
--> none
<-- KSTREAM-MERGE-0000000005
Self-join topology:
Code Block |
---|
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [topic1])
--> KSTREAM-WINDOWED-0000000001
Processor: KSTREAM-WINDOWED-0000000001 (stores: [KSTREAM-JOINTHIS-0000000003-store])
--> KSTREAM-MERGE-0000000005
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-MERGE-0000000005 (stores: [KSTREAM-JOINTHIS-0000000003-store])
--> KSTREAM-PROCESSOR-0000000006
<-- KSTREAM-WINDOWED-0000000001
Processor: KSTREAM-PROCESSOR-0000000006 (stores: [])
--> none
<-- KSTREAM-MERGE-0000000005 |
As you can see, none of the indices or names of the process is affected.
Rejected Alternatives
Add to the DSL the operator selfJoin. We did not go with this approach as we prefer to push the complexity of the optimization to streams instead of to the user.
...