Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We would like to address a gap in the API right now when it comes to stream-stream joins (really 3 separate issues). For a quick refresher, these are a special case in the DSL right now, and actually cannot be configured among DSL operators as they cannot be configured via Materialized and are instead the lone instance of an operator configuration accepting a StoreSupplier directly (rather than indirectly, eg via a StoreBuilder or Materialized). For this reason, they are actually completely missed by the default.dsl.store config as it currently operates exclusively through the Materialized class. The only way to configure the underlying state store implementation is with the StreamJoined configuration class.

  1. So, problem #1 is there is no way to set the default state store type either via the default.dsl.store config or the related Materialized APIs that accept a StoreType enum.
  2. The 2nd problem follows from the 1st: because you are forced to configure these stores through the StreamJoined class, which in turn requires a WindowStoreSupplier (or two), it's up to the user to figure out the correct values for all three parameters (windowSize, retentionPeriod, and retainDuplicates). And not only is there indeed a "correct" answer for all three (unlike other operators which let users freely choose some or all of the store parameters), it can actually be quite difficult to determine what those expected values are. For example, retainDuplicates must always be true for stream-stream joins, while the retentionPeriod is fixed at the windowSize + gracePeriod. Most frustratingly, however, is that the windowSize itself is not what you might assume or expect: if you used the old JoinWindows#of (or new JoinWindows#ofTimeDifferenceAndNoGrace), the WindowStoreSupplier will actually require twice the "time difference" passed into your JoinWindows. And the only way to find out is trial and error or checking the code after you get a runtime exception for passing in the wrong value(s).
  3. And finally, there's the 3rd problem: as described in KAFKA-14976, some stream-stream join types now use an additional store which is not windowed but rather a KeyValueStore. This is, at the moment, completely un-customizable and not exposed to the user in any way – it simply hardcodes a RocksDBStore or InMemoryStore depending on what the join's WindowStoreSupplier(s) return for the StateStore#persistent API. This means no matter what one does, there is no way to use all (and only) custom state store implementations for any application including these stream-stream joins!

Public Interfaces

The new feature will be accessed via a new config, defined below. This will take precedence over the old default.dsl.store config, which will only be used if the new config is not set. We will also deprecate the default.dsl.store config to make it clear which of the two should be preferred. Further discussion on this point can be found under "Rejected Alternatives"

...

Code Block
languagejava
titleDSLStoreProvider.java
public interface DSLStoreProvider {

    KeyValueBytesStoreSupplier keyValueStore(final String name);

    WindowBytesStoreSupplier windowStore(final String name, final Duration retentionPeriod, final Duration windowSize, final boolean retainDuplicates);

    SessionBytesStoreSupplier sessionStore(final String name, final Duration retentionPeriod);
 }   
}  

To cover the two default DSL store types currently offered, we will also introduce two OOTB implementations which can be accessed by users through the same Stores factory class that hosts all the static state-store-related APIs today:

...

Code Block
languagejava
titleMaterialized.java
public class Materialized {

    ///// Adding these two new methods ////

    /**
     * Materialize a {@link StateStore} with the given {@link DSLStoreProvider}.
     *
     * @param storeProvider  the type of the state store
     * @param <K>       key type of the store
     * @param <V>       value type of the store
     * @param <S>       type of the {@link StateStore}
     * @return a new {@link Materialized} instance with the given store provider
     */
    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final DSLStoreProvider storeProvider);
     
    /**
     * Set the type of the materialized {@link StateStore}.
     *
     * @param storeProvider  the store provider {@link DSLStoreProvider} to use.
     * @return itself
     * @throws IllegalArgumentException if store supplier is also pre-configured
     */
    public Materialized<K, V, S> withStoreType(final DSLStoreProvider storeProvider);


   //// Deprecating these two old methods ////

    @Deprecated
    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StoreType storeType);

    @Deprecated
    public Materialized<K, V, S> withStoreType(final StoreType storeType);
}

Finally, to address the three problems with configuring state stores for stream-stream joins today, we propose one last new method that will operate similarly to the new Materialized APIs but for StreamJoined:

Code Block
languagejava
titleStreamJoined.java
public class StreamJoined {

    /**
     * Creates a StreamJoined instance with the given {@link DSLStoreProvider}. The store provider
     * will be used to get all the state stores in this operation that do not otherwise have an
     * explicitly configured {@link org.apache.kafka.streams.state.StoreSupplier}.
     *
     * @param storeProvider       the store provider that will be used for all unconfigured state stores
     * @param <K>                 the key type
     * @param <V1>                this value type
     * @param <V2>                other value type
     * @return                    {@link StreamJoined} instance
     */
    public static <K, V1, V2> StreamJoined<K, V1, V2> with(final DSLStoreProvider storeProvider);
    
    /**
     * Configure with the provided {@link DSLStoreProvider} for all state stores that are not
     * configured with a {@link org.apache.kafka.streams.state.StoreSupplier} already.
     * 
     * @param storeProvider  the store provider to use for all unconfigured state stores
     * @return            a new {@link StreamJoined} configured with this store provider
     */
    public StreamJoined<K, V1, V2> withStoreProvider(final DSLStoreProvider storeProvider);
}  

Proposed Changes

// TODO

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

...