Status

Current state: Discarded (covered by KIP-182)

Discussion thread: here

JIRA: TBD

PRhere

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

 

Note: This KIP may become obsolete as the discussion on "Streams DSL/StateStore Refactoring" supersedes it.

Motivation

The recommended practice to create a StateStoreSupplier is as per the following example:

StateStoreSupplier countStore = Stores.create("Counts")
    .withKeys(Serdes.String())
    .withValues(Serdes.Long())
    .persistent()
    .build();

However, StateStoreSupplier is a generic interface that takes the StateStore type as a parameter:

public interface StateStoreSupplier<T extends StateStore>

In the above example that type parameter is lost as the build() method returns a raw type.

As StateStoreSupplier is passed to count/reduce/aggregate etc. methods on KGroupedStream or KGroupedTable, the compiler cannot detect if a supplier for the wrong kind of store is provided.

The other parameters to those methods, such as Serdes, Reducers, etc are type-parameterised by the key and value types allowing compile-time type checks.

The StateStoreSupplier argument stands out as a raw type. Making it type-parameterised will help detect at compile time errors such as when someone refactors their app to use a different typo of aggregations (e.g. TimeWindowed vs SessionWindowed) and forgets to change the StateStoreSupplier passed in.

Public Interfaces

 

Proposed Changes

Pull Request to demonstrate the changes: https://github.com/apache/kafka/pull/2992/files

 

The new usage would be e.g.:

TypedStateStoreSupplier<KeyValueStore<String, Long>> countStore = TypedStores.create("Counts")
    .withKeys(Serdes.String())
    .withValues(Serdes.Long())
    .persistent()
    .build();
TypedStateStoreSupplier<WindowStore<String, Long>> windowedStore = TypedStores.create("WindowedCounts")
    .withKeys(Serdes.String())
    .withValues(Serdes.Long())
    .persistent()
	.windowed(1000, 10000, 10, false)
    .build();
TypedStateStoreSupplier<SessionStore<String, Long>> sessionStore = TypedStores.create("SessionWindowedCounts")
    .withKeys(Serdes.String())
    .withValues(Serdes.Long())
    .persistent()
	.sessionWindowed(60000)
    .build();

 

Compatibility, Deprecation, and Migration Plan

Changes are intended to be backwards-compatible.

Test Plan

Only re-run of existing tests is envisaged at this time.

Rejected Alternatives

Add type parameters to current method parameters. This has been rejected as a backwards-incompatible change.

<K, V>