Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

// TODO – main background/motivation. Can start with what's in the ticket description and build off of that 

We would like to address a gap in the API right now when it comes to stream-stream joins (really 3 separate issues). For a quick refresher, these are a special case among DSL operators as they cannot be configured via Materialized and are instead the lone instance of an operator configuration accepting a StoreSupplier directly (rather than indirectly, eg via a StoreBuilder or Materialized). For this reason, they are actually completely missed by the default.dsl.store config as it currently operates exclusively through the Materialized class. The only way to configure the underlying state store implementation is with the StreamJoined configuration class.

There are two motivations for this KIP:

  1. We want to complete the work outlined in KIP-591, specifically extending the functionality to cover custom state stores. This KIP expands on and makes feasible the "Rejected Alternative" in KIP-591, which was rejected in favor of reducing scope. Without this KIP, DSL users must carefully examine the output of their topology description and wire in custom state stores (using the Materialized  API) at every location that uses state. This is both tedious and error prone; if Materialized  is used in an incorrect location an unnecessary state store may be created. In the inverse scenario, missing a Materialized  call would silently create a RocksDB store.
  2. We would like to address a gap in the API for stream-stream joins (really 3 separate issues). For a quick refresher, these are a special case among DSL operators that cannot be configured via Materialized  and are instead the lone instance of an operator configuration accepting a StoreSupplier directly (rather than indirectly, eg via a StoreBuilder or Materialized). For this reason, they are completely missed by the default.dsl.store config as it currently operates exclusively through the Materialized class. The only way to configure the underlying state store implementation is with the StreamJoined configuration class.
  3. As
  4. So, problem #1 is there is no way to set the default state store type either via the default.dsl.store config or the related Materialized APIs that accept a StoreType enum.
  5. The 2nd problem follows from the 1st: because you are forced to configure these stores through the StreamJoined class, which in turn requires a WindowStoreSupplier (or two), it's up to the user to figure out the correct values for all three parameters (windowSize, retentionPeriod, and retainDuplicates). And not only is there indeed a "correct" answer for all three (unlike other operators which let users freely choose some or all of the store parameters), it can actually be quite difficult to determine what those expected values are. For example, retainDuplicates must always be true for stream-stream joins, while the retentionPeriod is fixed at the windowSize + gracePeriod. Most frustratingly, however, is that the windowSize itself is not what you might assume or expect: if you used the old JoinWindows#of (or new JoinWindows#ofTimeDifferenceAndNoGrace), the WindowStoreSupplier will actually require twice the "time difference" passed into your JoinWindows. And the only way to find out is trial and error or checking the code after you get a runtime exception for passing in the wrong value(s).
  6. And finally, there's the 3rd problem: as described in KAFKA-14976, some stream-stream join types now use an additional store which is not windowed but rather a KeyValueStore. This is , at the moment, completely un-not customizable and not exposed to the user in any way – it simply hardcodes a RocksDBStore or InMemoryStore depending on what the join's WindowStoreSupplier(s) return for the StateStore#persistent API. This means no matter what one does, there is no way to use all (and only) custom state store implementations for any application including these stream-stream joins!simply hardcodes a RocksDBStore or InMemoryStore depending on what the join's WindowStoreSupplier(s) return for the StateStore#persistent API. This means no matter what one does, there is no way to use all (and only) custom state store implementations for any application including these stream-stream joins!

The 2nd problem follows from the 1st: because you are forced to configure these stores through the StreamJoined class, which in turn requires a WindowStoreSupplier (or two), it's up to the user to figure out the correct values for all three parameters (windowSize, retentionPeriod, and retainDuplicates). And not only is there indeed a "correct" answer for all three (unlike other operators which let users freely choose some or all of the store parameters), it is difficult to determine what those expected values are. For example, retainDuplicates must always be true for stream-stream joins, while the retentionPeriod is fixed at the windowSize + gracePeriod. Most frustratingly, however, is that the windowSize itself is not what you might assume or expect: if you used the old JoinWindows#of (or new JoinWindows#ofTimeDifferenceAndNoGrace), the WindowStoreSupplier will actually require twice the "time difference" passed into your JoinWindows. And the only way to find out is trial and error or checking the code after you get a runtime exception for passing in the wrong value(s).

Public Interfaces

The new feature will be accessed via a new config, defined below. This will take precedence over the old default.dsl.store config, which will only be used if the new config is not set. We will also deprecate the default.dsl.store config to make it clear which of the two should be preferred. Further discussion on this point can be found under "Rejected Alternatives"

...