Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
filter(Predicate, Named)YESstatic Named#as(String)N/AN/A
filterNot(Predicate, Named)YESstatic Named#as(String)N/AN/A
selectKey(KeyValueMapper, Named)YESstatic Named#as(String)N/AN/A
map(KeyValueMapper, Named)YESstatic Named#as(String)N/AN/A
mapValues(ValueMapper, Named)YESstatic Named#as(String)N/AN/A
mapValues(ValueMapperWithKey, Named)YESstatic Named#as(String)N/AN/A
flatMap(KeyValueMapper, Named)YESstatic Named#as(String)N/AN/A
flatMapValues(ValueMapper, Named)YESstatic Named#as(String)N/AN/A
flatMapValues(ValueMapperWithKey, Named)YESstatic Named#as(String)N/AN/A
print(Printed)NOstatic Printed#as(String)N/AN/A
foreach(ForeachAction, Named)YESstatic Named#as(String)N/AN/A
peek(ForeachAction, Named)YESstatic Named#as(String)N/AN/A
branch(Named, Predicate...)YESstatic Named#as(String)N/AN/A
through(String, Produced)NOstatic Produced#as(String)N/AN/A
to(String, Produced)NOstatic Produced#as(String)N/AN/A
to(TopicNameExtractor, Produced)NOstatic Produced#as(String)N/AN/A
transform(TransformerSupplier, Named, String... )YESstatic Named#as(String)N/AN/A
transformValues(ValueTransformerSupplier, Named, String...)YESstatic Named#as(String)N/AN/A
transformValues( ValueTransformerWithKeySupplier, Named, String...)YESstatic Named#as(String)N/AN/A
process(ProcessorSupplier, Named, String...)YESstatic Named#as(String)N/AN/A
join( KStream, ValueJoiner, JoinWindows windows, Joined)NOstatic Joined#named(final String name)static Joined#named(final String name)static Joined#named(final String name)
leftJoin(KStream, ValueJoiner, JoinWindows, Joined)NOstatic Joined#named(final String name)static Joined#named(final String name)static Joined#named(final String name)
outerJoin(KStream, ValueJoiner, JoinWindows, Joined)NOstatic Joined#named(final String name)static Joined#named(final String name)static Joined#named(final String name)
join(KTable, ValueJoiner, Joined)NOstatic Joined#named(final String name)static Joined#named(final String name)N/A
leftJoin(KTable, ValueJoiner, Joined)NOstatic Joined#named(final String name)static Joined#named(final String name)N/A
join(GlobalKTbale, KeyValueMapper, ValueJoiner)NO(PREFIX + COUNT)N/AN/A
join(GlobalKTbale, KeyValueMapper, ValueJoiner, Named)YESstatic Joined#named(final String name)N/AN/A
leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)NO??????(PREFIX + COUNT)N/AN/A???
leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)NO??????YESstatic Joined#named(final String name)N/AN/A???


KTable (16 new methods)


methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
filter(Predicate, Named)YESstatic Named#as(String)N/Astatic Named#as(StringPREFIX + COUNT)
filter(Predicate, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
filterNot(Predicate, Named)YESstatic Named#as(String)N/Astatic Named#as(StringPREFIX + COUNT)
filterNot(Predicate, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
mapValues(ValueMapper, Named)YESstatic Named#as(String)N/Astatic Named#as(StringPREFIX + COUNT)
mapValues(ValueMapperWithKey, Named)YESstatic Named#as(String)N/Astatic Named#as(StringPREFIX + COUNT)
mapValues(ValueMapper, Named, Materialized)YESstatic Named#as(String)N/Astatic Materialized#as(String)
mapValues(ValueMapperWithKey, Named, Materialized);YESstatic Named#as(String)N/Astatic Materialized#as(String)
suppress(Suppressed)NOSuppressed#withName(String)N/AN/A
transformValues(ValueTransformerWithKeySupplier, Named, String...)YESstatic Named#as(String)N/Astatic Named#as(StringPREFIX + COUNT)
transformValues(ValueTransformerWithKeySupplier, Materialized, Named, String...)YESstatic Named#as(String)N/Astatic Materialized#as(String)
groupBy(KeyValueMapper, KeyValue, Grouped)NOstatic Grouped#as(String)static Grouped#as(String)N/A
join(KTable, ValueJoiner, Named)YESstatic Named#as(String)N/Astatic Named#as(StringPREFIX + COUNT)
join(KTable, ValueJoiner, Named, Materialized)YESstatic Named#as(String)N/Astatic Materialized#as(String)
leftJoin(KTable, ValueJoiner, Named);YESstatic Named#as(String)N/Astatic Named#as(StringPREFIX + COUNT)
leftJoin(KTable, ValueJoiner, Named, Materialized)YESstatic Named#as(String)N/Astatic Materialized#as(String)
outerJoin(KTable, ValueJoiner, Named);YESstatic Named#as(String)N/Astatic Named#as(StringPREFIX + COUNT)
outerJoin(KTable, ValueJoiner, Named, Materialized)YESstatic Named#as(String)N/Astatic Materialized#as(String)

...

XX

GeneratedNamedJoined / Grouped / Produced / ConsumedMaterialized
Node NameXXX
Repartition TopicX
X
Queryable Store


X
State storeX
XX
Changelog TopicX
XX


Materialized


The main reason why we propose to overload each method accepting a Materialized argument is to not introduce ambitguity by conflating config objects that configure an operation (like Grouped, Joined) with config objects that configure an aspect of the operation (like Materialized).Also, note that for all methods accepting a Materialized argument, if no state store named is provided then the node named will be used to generate a one. The state store name will be the node name suffixed with "-table".


Proposed Changes

  • Implement the new interface NamedOperation and default class Named 
  • Update all parameter class to implement NamedOperation : Produced Consumed Printed Joined Grouped Suppressed
  • Overload methods stateless for classes KStreams, KTables, KGroupedStream, KGroupedTable, TimeWindowedKStream, TimeWindowedKTable
  • The processor names specified by developer will be used in place of the static processor prefix. Statics prefixes will still be used if no custom processor name are specified.
  • Processor names should follow the same restrictions as the topic names. So legal characters are [a-zA-Z0-9._-] and the maximum length us 249.

...