Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAccepted

Discussion thread: here 

JIRA: here

...

methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
filter(Predicate, Named)YESstatic Named#as(String)N/AN/A
filterNot(Predicate, Named)YESstatic Named#as(String)N/AN/A
selectKey(KeyValueMapper, Named)YESstatic Named#as(String)N/AN/A
map(KeyValueMapper, Named)YESstatic Named#as(String)N/AN/A
mapValues(ValueMapper, Named)YESstatic Named#as(String)N/AN/A
mapValues(ValueMapperWithKey, Named)YESstatic Named#as(String)N/AN/A
flatMap(KeyValueMapper, Named)YESstatic Named#as(String)N/AN/A
flatMapValues(ValueMapper, Named)YESstatic Named#as(String)N/AN/A
flatMapValues(ValueMapperWithKey, Named)YESstatic Named#as(String)N/AN/A
print(Printed)NOstatic Printed#as(String)N/AN/A
foreach(ForeachAction, Named)YESstatic Named#as(String)N/AN/A
peek(ForeachAction, Named)YESstatic Named#as(String)N/AN/A
branch(Named, Predicate...)YESstatic Named#as(String)N/AN/A
through(String, Produced)NOstatic Produced#as(String)N/AN/A
to(String, Produced)NOstatic Produced#as(String)N/AN/A
to(TopicNameExtractor, Produced)NOstatic Produced#as(String)N/AN/A
transform(TransformerSupplier, Named, String... )YESstatic Named#as(String)N/AN/A
transformValues(ValueTransformerSupplier, Named, String...)YESstatic Named#as(String)N/AN/A
transformValues( ValueTransformerWithKeySupplier, Named, String...)YESstatic Named#as(String)N/AN/A
process(ProcessorSupplier, Named, String...)YESstatic Named#as(String)N/AN/A
join( KStream, ValueJoiner, JoinWindows windows, Joined)NOstatic Joined#namedJoined#as(final String name)static Joined#namedJoined#as(final String name)static Joined#namedJoined#as(final String name)
leftJoin(KStream, ValueJoiner, JoinWindows, Joined)NOstatic Joined#namedJoined#as(final String name)static Joined#namedJoined#as(final String name)static Joined#namedJoined#as(final String name)
outerJoin(KStream, ValueJoiner, JoinWindows, Joined)NOstatic Joined#namedJoined#as(final String name)static Joined#namedJoined#as(final String name)static Joined#namedJoined#as(final String name)
join(KTable, ValueJoiner, Joined)NOstatic Joined#namedJoined#as(final String name)static Joined#namedJoined#as(final String name)N/A
leftJoin(KTable, ValueJoiner, Joined)NOstatic Joined#namedJoined#as(final String name)static Joined#namedJoined#as(final String name)N/A
join(GlobalKTableGlobalKTbale, KeyValueMapper, ValueJoiner, Named)NO??????YESstatic Named#as(String)
N/AN/A???
leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner, Named)NO?????????

KTable (16 new methods)

filter(Predicate, Namedstatic Named#as(String)
YESstatic Named#as(String)
N/AN/A
flatTransform(TransformerSupplier, Named named, String... stateStoreNames
methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
)YESstatic Named#as(String)N/Astatic Named#as(String)N/A
flatTransformValues(ValueTransformerWithKeySupplierfilter(Predicate, Named, Materialized  String... )YESstatic Named#as(String)N/AMaterialized#as(String)filterNot(Predicate, Named)YESN/A
static Named#as(String)filterNot(PredicateflatTransformValues(ValueTransformerSupplier, Named, MaterializedString...)YESstatic Named#as(String)N/AMaterialized#as(String)N/A


KTable (16 new methods)


methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?count(Named)YESmapValues(ValueMapperWithKeystatic Named#asStringValueMapperWithKey, Materialized;static Materialized#asStringtransformValues(ValueTransformerWithKeySupplier, Named, String...static Named#asString
methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
filter(Predicate
mapValues(ValueMapper, Named)YESstatic Named#as(String)N/Astatic Named#as(String)
, Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
mapValuesfilter(ValueMapperPredicate, Named, Materialized)YESstatic Named#as(String)N/Astatic Materialized#as(String)mapValues
filterNot(Predicate, Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
suppress(SuppressedfilterNot(Predicate, Named, Materialized)NOYESSuppressed#withNamestatic Named#as(String)N/AN/AMaterialized#as(String)
mapValues(ValueMapper, Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
transformValuesmapValues(ValueTransformerWithKeySupplier, MaterializedValueMapperWithKey, Named, String...)YESstatic Named#as(String)N/Astatic Materialized#as(StringPREFIX + COUNT)
groupBymapValues(KeyValueMapperValueMapper, KeyValueNamed, GroupedMaterialized)NOYESstatic Grouped#asNamed#as(String)N/Astatic Grouped#asMaterialized#as(String)N/A
join(KTable, ValueJoiner, Named)mapValues(ValueMapperWithKey, Named, Materialized);YESstatic Named#as(String)N/Astatic Named#asMaterialized#as(String)
join(KTable, ValueJoiner, Named, Materializedsuppress(Suppressed)YESNOstatic Named#asSuppressed#withName(String)N/Astatic Materialized#as(String)N/A
transformValues(ValueTransformerWithKeySupplier, Named, String...)leftJoin(KTable, ValueJoiner, Named);YESstatic Named#as(String)N/Astatic Named#as(StringPREFIX + COUNT)
leftJointransformValues(KTableValueTransformerWithKeySupplier, ValueJoinerMaterialized, Named, MaterializedString...)YESstatic Named#as(String)N/Astatic Materialized#as(String)
groupBy(KeyValueMapper, KeyValue, Grouped)NOstatic Grouped#as(String)static Grouped#as(String)N/A
joinouterJoin(KTable, ValueJoiner, Named);YESstatic Named#as(String)N/Astatic Named#as(StringPREFIX + COUNT)
outerJoinjoin(KTable, ValueJoiner, Named, Materialized)YESstatic Named#as(String)N/Astatic Materialized#as(String)

KGroupedStream (6 new methods)

leftJoin(KTable, ValueJoiner, Named);YES
static Named#as(String)N/A(PREFIX + COUNT)
count
leftJoin(KTable, ValueJoiner, Named, Materialized)YESstatic Named#as(String)N/Astatic Materialized#as(String)
reduce
outerJoin(
Reducer
KTable, ValueJoiner, Named);YESstatic Named#as(String)N/A(PREFIX + COUNT)
reduce
outerJoin(
Reducer
KTable, ValueJoiner, Named, Materialized)YESstatic Named#as(String)N/Astatic Materialized#as(String)
aggregate
toStream(
Initializer, Aggregator,
Named)YESstatic Named#as(String)N/A
(PREFIX + COUNT)aggregate(Initializer, Aggregator, Named, Materialized
N/A
toStream(KeyValueMapper, Named)YESstatic Named#as(String)N/A
Materialized#as(String)
N/A


KGroupedStream KGroupedTable  (6 new methods)


methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
count(Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
count(Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
reduce(Reducer, Reducer, Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
reduce(Reducer, Reducer, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
aggregate(Initializer, Aggregator, Aggregator, Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
aggregate(Initializer, Aggregator, Aggregator, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)


TimeWindowedKStream KGroupedTable (6 new methods)


methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
count(Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
count(Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
aggregate
reduce(
Initializer
Reducer,
Aggregator
Reducer, Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
aggregate
reduce(
Initializer
Reducer,
Aggregator
Reducer, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
reduce(Reducer
aggregate(Initializer, Aggregator, Aggregator, Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
reduce(Reducer
aggregate(Initializer, Aggregator, Aggregator, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)


SessionWindowedKStream TimeWindowedKStream (6 new methods)

methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
count(Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
count(Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
aggregate(Initializer, Aggregator, Merger, Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
aggregate(Initializer, Aggregator, Merger, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
reduce(Reducer, Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
reduce(Reducer, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)

At the end, we can summarize the scope of each configuration class as follow : 

...


SessionWindowedKStream (6 new methods)

methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
count(Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
count(Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
aggregate(Initializer, Aggregator, Merger, Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
aggregate(Initializer, Aggregator, Merger, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
reduce(Reducer, Named)YESstatic Named#as(String)N/A(PREFIX + COUNT)
reduce(Reducer, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)


At the end, we can summarize the scope of each configuration class as follow : 



GeneratedNamedJoined / Grouped / Produced / ConsumedMaterialized
Node NameXXX
Repartition TopicX
X
Queryable Store


X
State storeX
XX
Changelog TopicX
XX


Materialized


The main reason why we propose to overload each method accepting a Materialized argument is to not introduce ambitguity by conflating config objects that configure an operation (like Grouped, Joined) with config objects that configure an aspect of the operation (like Materialized).

Name Validation

User provided node name should follow the same restrictions that ones currently apply to state stores during the create of Materialized instance.

Currently, the Materialized class relies on the static method Topic#validate. This method ensure that a provided name only contains legal characters [a-zA-Z0-9._-] and have a maximum length of 249.


We propose to copy methods from Topic#validate into Named. This new method will be used validate both store names and node names. The benefit is to remove a dependency with the core module.

In addition, the Materialized class will throw a TopologyException while building the topology in case of a unvalid name instead of InvalidTopicException

Materialized

The main reason why we propose to overload each method accepting a Materialized argument is to not introduce ambitguity by conflating config objects that configure an operation (like Grouped, Joined) with config objects that configure an aspect of the operation (like Materialized).

Also, note that for all methods accepting a Materialized argument, if no state store named is provided then the node named will be used to generate a one. The state store name will be the node name suffixed with "-table" .


Proposed Changes

  • Implement the new interface NamedOperation and default class Named 
  • Update all parameter class to implement NamedOperation : Produced Consumed Printed Joined Grouped Suppressed
  • Overload methods stateless for classes KStreams, KTables, KGroupedStream, KGroupedTable, TimeWindowedKStream, TimeWindowedKTable
  • The processor names specified by developer will be used in place of the static processor prefix. Statics prefixes will still be used if no custom processor name are specified.
  • Processor names should follow the same restrictions as the topic names. So legal characters are [a-zA-Z0-9._-] and the maximum length us of 249.


Below is an application example : 

...