Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The self-join operator will maintain one state store. For every new record, the record will first get added operator will add it to the state store and then the operator will use it to perform a lookup into the same state store to do the actual join. This means that every record will at least join with itself.

  • Add a rule to the optimizer that will rewrite an inner join to a self-join. The graph graphs (logical planplans) created from the DSL excerpts above are the same (after the optimization mergeDuplicateSourceNodes) and look as follows:

...

Compatibility, Deprecation, and Migration Plan

...

The change is backward compatible since:

  • It reuses existing topics/state stores
  • It does not change the names of existing topics/state stores
  • It removes from the topology the right-side state store but this state store is kept around in case users want to roll back. 
  • It does not change the internal naming of processors or graph nodes

Here is an example topology of an inner join and how it gets rewritten:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [topic2])
      --> KSTREAM-WINDOWED-0000000001, KSTREAM-WINDOWED-0000000002
    Processor: KSTREAM-WINDOWED-0000000001 (stores: [KSTREAM-JOINTHIS-0000000003-store])
      --> KSTREAM-JOINTHIS-0000000003
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINOTHER-0000000004-store])
      --> KSTREAM-JOINOTHER-0000000004
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-JOINOTHER-0000000004 (stores: [KSTREAM-JOINTHIS-0000000003-store])
      --> KSTREAM-MERGE-0000000005
      <-- KSTREAM-WINDOWED-0000000002
    Processor: KSTREAM-JOINTHIS-0000000003 (stores: [KSTREAM-JOINOTHER-0000000004-store])
      --> KSTREAM-MERGE-0000000005
      <-- KSTREAM-WINDOWED-0000000001
    Processor: KSTREAM-MERGE-0000000005 (stores: [])
      --> KSTREAM-PROCESSOR-0000000006
      <-- KSTREAM-JOINTHIS-0000000003, KSTREAM-JOINOTHER-0000000004
    Processor: KSTREAM-PROCESSOR-0000000006 (stores: [])
      --> none
      <-- KSTREAM-MERGE-0000000005

Image Added

Self-join topology:

Code Block
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [topic1])
      --> KSTREAM-WINDOWED-0000000001
    Processor: KSTREAM-WINDOWED-0000000001 (stores: [KSTREAM-JOINTHIS-0000000003-store])
      --> KSTREAM-MERGE-0000000005
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-MERGE-0000000005 (stores: [KSTREAM-JOINTHIS-0000000003-store])
      --> KSTREAM-PROCESSOR-0000000006
      <-- KSTREAM-WINDOWED-0000000001
    Processor: KSTREAM-PROCESSOR-0000000006 (stores: [])
      --> none
      <-- KSTREAM-MERGE-0000000005


Image Added

As you can see, none of the indices or names of the process is affected.

Rejected Alternatives

Add to the DSL the operator selfJoin. We did not go with this approach as we prefer to push the complexity of the optimization to streams instead of to the user.

...