You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

// TODO – main background/motivation. Can start with what's in the ticket description and build off of that 

We would like to address a gap in the API right now when it comes to stream-stream joins (really 3 separate issues). For a quick refresher, these are a special case among DSL operators as they cannot be configured via Materialized and are instead the lone instance of an operator configuration accepting a StoreSupplier directly (rather than indirectly, eg via a StoreBuilder or Materialized). For this reason, they are actually completely missed by the default.dsl.store config as it currently operates exclusively through the Materialized class. The only way to configure the underlying state store implementation is with the StreamJoined configuration class.

  1. So, problem #1 is there is no way to set the default state store type either via the default.dsl.store config or the related Materialized APIs that accept a StoreType enum.
  2. The 2nd problem follows from the 1st: because you are forced to configure these stores through the StreamJoined class, which in turn requires a WindowStoreSupplier (or two), it's up to the user to figure out the correct values for all three parameters (windowSize, retentionPeriod, and retainDuplicates). And not only is there indeed a "correct" answer for all three (unlike other operators which let users freely choose some or all of the store parameters), it can actually be quite difficult to determine what those expected values are. For example, retainDuplicates must always be true for stream-stream joins, while the retentionPeriod is fixed at the windowSize + gracePeriod. Most frustratingly, however, is that the windowSize itself is not what you might assume or expect: if you used the old JoinWindows#of (or new JoinWindows#ofTimeDifferenceAndNoGrace), the WindowStoreSupplier will actually require twice the "time difference" passed into your JoinWindows. And the only way to find out is trial and error or checking the code after you get a runtime exception for passing in the wrong value(s).
  3. And finally, there's the 3rd problem: as described in KAFKA-14976, some stream-stream join types now use an additional store which is not windowed but rather a KeyValueStore. This is, at the moment, completely un-customizable and not exposed to the user in any way – it simply hardcodes a RocksDBStore or InMemoryStore depending on what the join's WindowStoreSupplier(s) return for the StateStore#persistent API. This means no matter what one does, there is no way to use all (and only) custom state store implementations for any application including these stream-stream joins!

Public Interfaces

The new feature will be accessed via a new config, defined below. This will take precedence over the old default.dsl.store config, which will only be used if the new config is not set. We will also deprecate the default.dsl.store config to make it clear which of the two should be preferred. Further discussion on this point can be found under "Rejected Alternatives"

StreamsConfig.java
public static final String DEFAULT_DSL_STORE_PROVIDER_CONFIG = "default.dsl.store.provider";
public static final String DEFAULT_DSL_STORE_PROVIDER_DOC = "Default state store provider used by DSL operators. Must implement the <code>org.apache.kafka.streams.state.DSLStoreProvider</code> interface.";

@Deprecated
public static final String DEFAULT_DSL_STORE_CONFIG = "default.dsl.store";

As mentioned in the doc string, this config will expect a class that implements the new interface DSLStoreProvider, defined as follows:

DSLStoreProvider.java
public interface DSLStoreProvider {

    KeyValueBytesStoreSupplier keyValueStore(final String name);

    WindowBytesStoreSupplier windowStore(final String name, final Duration retentionPeriod, final Duration windowSize, final boolean retainDuplicates);

    SessionBytesStoreSupplier sessionStore(final String name, final Duration retentionPeriod);
}  

To cover the two default DSL store types currently offered, we will also introduce two OOTB implementations which can be accessed by users through the same Stores factory class that hosts all the static state-store-related APIs today:

Stores.java
public class Stores {

    public static DSLStoreProvider rocksDB() {
        return new RocksDBStoreProvider(); // new class
    }

    public static DSLStoreProvider inMemory() {
        return new InMemoryStoreProvider(); // new class
    }
}  

Next, to keep the overall public interface in alignment with the changing config, we will introduce a new Materialized.as static constructor similar to the Materialized.as(StoreType) API that was added alongside the default.dsl.store config. Similarly, there will be a new overload of the #withStoreType API that accepts a DSLStoreProvider instead  – keeping the same name to minimize disruption to existing users, though something like #withStoreProvider or #withDSLStoreProvider would be a reasonable choice here as well. The corresponding StoreType APIs will be deprecated (please do check out the "Rejected Alternatives" for some additional thoughts here):

Materialized.java
public class Materialized {

    ///// Adding these two new methods ////

    /**
     * Materialize a {@link StateStore} with the given {@link DSLStoreProvider}.
     *
     * @param storeProvider  the type of the state store
     * @param <K>       key type of the store
     * @param <V>       value type of the store
     * @param <S>       type of the {@link StateStore}
     * @return a new {@link Materialized} instance with the given store provider
     */
    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final DSLStoreProvider storeProvider);
     
    /**
     * Set the type of the materialized {@link StateStore}.
     *
     * @param storeProvider  the store provider {@link DSLStoreProvider} to use.
     * @return itself
     * @throws IllegalArgumentException if store supplier is also pre-configured
     */
    public Materialized<K, V, S> withStoreType(final DSLStoreProvider storeProvider);


   //// Deprecating these two old methods ////

    @Deprecated
    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StoreType storeType);

    @Deprecated
    public Materialized<K, V, S> withStoreType(final StoreType storeType);
}

Finally, to address the three problems with configuring state stores for stream-stream joins today, we propose one last new method that will operate similarly to the new Materialized APIs but for StreamJoined:

StreamJoined.java
public class StreamJoined {

    /**
     * Creates a StreamJoined instance with the given {@link DSLStoreProvider}. The store provider
     * will be used to get all the state stores in this operation that do not otherwise have an
     * explicitly configured {@link org.apache.kafka.streams.state.StoreSupplier}.
     *
     * @param storeProvider       the store provider that will be used for all unconfigured state stores
     * @param <K>                 the key type
     * @param <V1>                this value type
     * @param <V2>                other value type
     * @return                    {@link StreamJoined} instance
     */
    public static <K, V1, V2> StreamJoined<K, V1, V2> with(final DSLStoreProvider storeProvider);
    
    /**
     * Configure with the provided {@link DSLStoreProvider} for all state stores that are not
     * configured with a {@link org.apache.kafka.streams.state.StoreSupplier} already.
     * 
     * @param storeProvider  the store provider to use for all unconfigured state stores
     * @return            a new {@link StreamJoined} configured with this store provider
     */
    public StreamJoined<K, V1, V2> withStoreProvider(final DSLStoreProvider storeProvider);
}  

Proposed Changes

// TODO

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

// TODO

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

// TODO

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

The main alternative to consider here is how to handle the original default.dsl.store config. As noted it's impossible to expand this config as-is without either changing how it's defined or introducing a new config. But there are a few possible approaches with their own merits and flaws:

  1. Don't deprecate the old default.dsl.store config and instead maintain it alongside the new DSLStoreProvider config. The advantage here is obviously that we minimize disruption to anyone using this config already. However I believe the existing userbase is likely to be quite small, as in its current form this feature is only useful to those who (a) have many state stores to the point overriding them separately is cumbersome, and (b) also wishes to override all (or most) of their stores to be in-memory specifically. This seems to be a relatively uncommon pattern already, not least of which being that it's a recipe for OOM disaster unless their applications are extremely small and/or low traffic. In the absence of any evidence that this config is already in use despite such a niche application, it seems best to take this opportunity to deprecate it now, before it gains any more traction, and have everyone converge on the same, single configuration going forward. Whenever there are two configs affecting the same underlying feature, it's inevitable there will be confusion over their interaction/hierarchy which can often lead to misconfiguration. Deprecating the old config is a clear sign to users which of the two should be preferred and will take precedence.
  2. Instead of positioning the new config as a full replacement for default.dsl.store, we could instead introduce it as a complementary config for custom stores specifically. For example, add a 3rd StoreType to the enum definition, such as CUSTOM. If (and only if) the CUSTOM StoreType is used, the value of the new default.dsl.store.provider config will be looked at and used to obtain the actual StoreSupplier instances of this custom type. This option has the same advantages as the above, though I would qualify it in the same way. In general, having two configs that do the same thing or are tied to the same feature will be confusing to new (or old!) users. However, between these two alternatives I would personally advocate for this one as the better option, as it at least solves the concern over potential misconfiguration due to unclear interaction between the two configs.
  3. On a different front, we should also consider the alternatives for the Materialized API. Arguably, we could deprecate the default.dsl.store config but not the two Materialized APIs related to the StoreType enum. I would be more open to leaving these two APIs as-is than to leaving the config, as I don't think (all of) the same arguments from Alternative #1 apply:. First, the userbase here is likely to be much wider, as it may include any applications which wish to override only a handful of stores to be in-memory (a considerably more practical and realistic setup). This could even include users who set the default.dsl.store to IN_MEMORY but wanted to keep some subset of stores on rocksdb. Second, even if the size of the userbase was exactly the same, the magnitude of the disruption would not be: deprecating default.dsl.store means each user/application has to change at most a single line of code to upgrade onto the new config. But deprecating Materialized.as(StoreType) would mean having to reconfigure each and every materialized operation that was using this, requiring a theoretically unbounded number of code changes per app.
    Of course, it would still be a very simple migration with a predictable pattern that one could easily achieve with a find-and-replace tool or sed command. It would be great if some users can weigh in during the KIP discussion for some basic data, even anecdotal, on how heavily this is/isn't used. Without reason to believe this would be a major disruption, it seems best to deprecate it now, before the usage becomes any more widespread, and position ourselves with a nice, clean API going forward.
  • No labels