Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Interfaces / API Changes (DSL only)

...

3 overloads for all APIs that create KTables: with and without store name. The alternative to store name is a StateStoreSupplier.

All API that create KTables will have 2 3 overloaded methods, one with the store name or with a StateStoreSupplier, and one without. Note that providing a null store name is the same as using the API with no store name.

These APIs include the ones below and any of their existing overloads. We do not list the overloads here to keep the list uncluttered. Each API will have one version with no store name, and one version with a store name and one version with a StateStoreSupplier (that contains the state store name).

In KTable.java overload each of the following APIs by adding store name and StateStoreSupplier (when they don't exist already):

  • KTable<K, V> filter(final Predicate<? super K, ? super V> predicate);
  • KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate);
  • <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper);
  • KTable<K, V> through(final String topic) 
  • <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);
  • <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,  final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);
  • <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);

In KStreamBuilder.java overload each of the following APIs by adding store namename and StateStoreSupplier (when they don't exist already):

  •  public <K, V> KTable<K, V> table(final String topic);
  • public <K, V> GlobalKTable<K, V> globalTable(final String topic)

  • public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerde, final String topic)

In KGroupedTable.java overload each of the following APIs by adding store name and StateStoreSupplier  (when they don't exist already):

  • KTable<K, Long> count(final String storeName);
  • KTable<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor);
  • <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> adder, final Aggregator<? super K, ? super V, VR> subtractor)

In KGroupedStream.java overload each of the following APIs by adding store name and StateStoreSupplier  (when they don't exist already):

  • KTable<K, Long> count(final String storeName);
  • KTable<K, V> reduce(final Reducer<V> reducer);
  • <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde);

...

  • Depreciate the following KTable methods: print(), writeAsText(), foreach() and any of their overloads.

Interfaces / API Changes (PAPI only)

During implementation it became clear that a minor adjustment was needed and org.apache.kafka.streams.processor.addGlobalStore should take a 

  • "final StateStoreSupplier<KeyValueStore>" type as the first argument, not a StateStore. Note that one can obtain a StateStore by calling the .get() method on a StateStoreSupplier.

Misc API cleanup

During implementation it became apparent that some APIs needed minor renaming or cleanup. That is listed here:

  • KTable.getStoreName() -> Ktable.queryableStoreName(). Reason is that in Kafka we don't usually use the "get" prefix.

Implementation plan

One implementation detail that is important is how the Kafka Streams internals decides whether to materialize a KTable. Note that the above APIs provide a way for Interactive Queries to query a state store. They do not dictate whether the state store itself if a real, materialized one, or a view on top of a real, materialized store. Going back to the first example in the motivation:

...

  • Will depreciate the KTable methods print(), writeAsText(), foreach() and any of their overloads. 
  • Code will be backwards compatible.

  • PAPI code that uses org.apache.kafka.streams.processor.addGlobalStore will need to use new API signature. 

Rejected Alternatives

Change the name of KTable. We discussed potentially changing the name of a KTable to something like KChangelogStream, but this doesn’t solve the main problem this KIP addresses, which is API inconsistency.

...