You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

// TODO – main background/motivation. Can start with what's in the ticket description and build off of that 

We would like to address a gap in the API right now when it comes to stream-stream joins. For a quick refresher, these are a special case in the DSL right now, and actually cannot be configured via the default.dsl.store config

Public Interfaces

The new feature will be accessed via a new config, defined below. This will take precedence over the old default.dsl.store config, which will only be used if the new config is not set. We will also deprecate the default.dsl.store config to make it clear which of the two should be preferred. Further discussion on this point can be found under "Rejected Alternatives"

StreamsConfig.java
public static final String DEFAULT_DSL_STORE_PROVIDER_CONFIG = "default.dsl.store.provider";
public static final String DEFAULT_DSL_STORE_PROVIDER_DOC = "Default state store provider used by DSL operators. Must implement the <code>org.apache.kafka.streams.state.DSLStoreProvider</code> interface.";

@Deprecated
public static final String DEFAULT_DSL_STORE_CONFIG = "default.dsl.store";

As mentioned in the doc string, this config will expect a class that implements the new interface DSLStoreProvider, defined as follows:

DSLStoreProvider.java
public interface DSLStoreProvider {

    KeyValueBytesStoreSupplier keyValueStore(final String name);

    WindowBytesStoreSupplier windowStore(final String name, final Duration retentionPeriod, final Duration windowSize, final boolean retainDuplicates);

    SessionBytesStoreSupplier sessionStore(final String name, final Duration retentionPeriod);
    
}  

To cover the two default DSL store types currently offered, we will also introduce two OOTB implementations which can be accessed by users through the same Stores factory class that hosts all the static state-store-related APIs today:

Stores.java
public class Stores {

    public static DSLStoreProvider rocksDB() {
        return new RocksDBStoreProvider(); // new class
    }

    public static DSLStoreProvider inMemory() {
        return new InMemoryStoreProvider(); // new class
    }
}  

Next, to keep the overall public interface in alignment with the changing config, we will introduce a new Materialized.as static constructor similar to the Materialized.as(StoreType) API that was added alongside the default.dsl.store config. Similarly, there will be a new overload of the #withStoreType API that accepts a DSLStoreProvider instead  – keeping the same name to minimize disruption to existing users, though something like #withStoreProvider or #withDSLStoreProvider would be a reasonable choice here as well. The corresponding StoreType APIs will be deprecated (please do check out the "Rejected Alternatives" for some additional thoughts here):

Materialized.java
public class Materialized {

    ///// Adding these two new methods ////

    /**
     * Materialize a {@link StateStore} with the given {@link DSLStoreProvider}.
     *
     * @param storeProvider  the type of the state store
     * @param <K>       key type of the store
     * @param <V>       value type of the store
     * @param <S>       type of the {@link StateStore}
     * @return a new {@link Materialized} instance with the given store provider
     */
    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final DSLStoreProvider storeProvider);
     
    /**
     * Set the type of the materialized {@link StateStore}.
     *
     * @param storeProvider  the store provider {@link DSLStoreProvider} to use.
     * @return itself
     * @throws IllegalArgumentException if store supplier is also pre-configured
     */
    public Materialized<K, V, S> withStoreType(final DSLStoreProvider storeProvider);


   //// Deprecating these two old methods ////

    @Deprecated
    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StoreType storeType);

    @Deprecated
    public Materialized<K, V, S> withStoreType(final StoreType storeType);
}



Proposed Changes

// TODO

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

// TODO

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

// TODO

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

The main alternative to consider here is how to handle the original default.dsl.store config. As noted it's impossible to expand this config as-is without either changing how it's defined or introducing a new config. But there are a few possible approaches with their own merits and flaws:

  1. Don't deprecate the old default.dsl.store config and instead maintain it alongside the new DSLStoreProvider config. The advantage here is obviously that we minimize disruption to anyone using this config already. However I believe the existing userbase is likely to be quite small, as in its current form this feature is only useful to those who (a) have many state stores to the point overriding them separately is cumbersome, and (b) also wishes to override all (or most) of their stores to be in-memory specifically. This seems to be a relatively uncommon pattern already, not least of which being that it's a recipe for OOM disaster unless their applications are extremely small and/or low traffic. In the absence of any evidence that this config is already in use despite such a niche application, it seems best to take this opportunity to deprecate it now, before it gains any more traction, and have everyone converge on the same, single configuration going forward. Whenever there are two configs affecting the same underlying feature, it's inevitable there will be confusion over their interaction/hierarchy which can often lead to misconfiguration. Deprecating the old config is a clear sign to users which of the two should be preferred and will take precedence.
  2. Instead of positioning the new config as a full replacement for default.dsl.store, we could instead introduce it as a complementary config for custom stores specifically. For example, add a 3rd StoreType to the enum definition, such as CUSTOM. If (and only if) the CUSTOM StoreType is used, the value of the new default.dsl.store.provider config will be looked at and used to obtain the actual StoreSupplier instances of this custom type. This option has the same advantages as the above, though I would qualify it in the same way. In general, having two configs that do the same thing or are tied to the same feature will be confusing to new (or old!) users. However, between these two alternatives I would personally advocate for this one as the better option, as it at least solves the concern over potential misconfiguration due to unclear interaction between the two configs.
  3. On a different front, we should also consider the alternatives for the Materialized API. Arguably, we could deprecate the default.dsl.store config but not the two Materialized APIs related to the StoreType enum. I would be more open to leaving these two APIs as-is than to leaving the config, as I don't think (all of) the same arguments from Alternative #1 apply:. First, the userbase here is likely to be much wider, as it may include any applications which wish to override only a handful of stores to be in-memory (a considerably more practical and realistic setup). This could even include users who set the default.dsl.store to IN_MEMORY but wanted to keep some subset of stores on rocksdb. Second, even if the size of the userbase was exactly the same, the magnitude of the disruption would not be: deprecating default.dsl.store means each user/application has to change at most a single line of code to upgrade onto the new config. But deprecating Materialized.as(StoreType) would mean having to reconfigure each and every materialized operation that was using this, requiring a theoretically unbounded number of code changes per app.
    Of course, it would still be a very simple migration with a predictable pattern that one could easily achieve with a find-and-replace tool or sed command. It would be great if some users can weigh in during the KIP discussion for some basic data, even anecdotal, on how heavily this is/isn't used. Without reason to believe this would be a major disruption, it seems best to deprecate it now, before the usage becomes any more widespread, and position ourselves with a nice, clean API going forward.
  • No labels