Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAccepted

Discussion thread: here 

JIRA: here

...

So allowing users to be able to set more meaningful names can help to resolve complexity of some developed topologies. For example, a processor name could describe the business rule performed by a map() operation.

Public Interfaces

In addition, the generated names have a few disadvantages to guarantee topology compatibilities. In fact, adding a new operator, using a third-library doing some optimization to remove some operators or upgrading to a new KafkaStreams version with internal API changes may changed suffix indexing for a large amount of the processor names. This will in turn change the internal state store names, as well as internal topic names as well.

Public Interfaces

First, we propose to add one new interface NamedOperation that can be used to customize stateless operations as well as stateful ones. The objective to create a new class is to keep consistent with the overall API design.

First, we propose to add one new interface NamedOperation that can be used to customize stateless operations as well as stateful ones. The objective to create a new class is to keep consistent with the overall API design.

Code Block
languagejava
package 
Code Block
languagejava
package org.apache.kafka.streams.kstream;

/**
 * Default interface which can be used to personalized the named of operations, internal topics or store.
 */
public interface NamedOperation<T extends NamedOperation<T>> {

    /**
     * ReturnSets the name to be used for operation.
     *
     * @param @returnname a stringthe name to use.
     */
 @return an instance of String name();

{@link NamedOperation}
     */**
    T * Sets the name to be used for operation.
     *
     * @param name  the name to use.
     * @return an instance of {@link NamedOperation}
     */
    T withName(final String name);

}


The NamedOperation interface will be implemented/extended by following classes that already exist to configure operations : 

withName(final String name);

}



The NamedOperation interface will be implemented/extended by following classes that already exist to configure operations : 

  • Produced 
  • Produced 
  • Consumed 
  • Printed 
  • Joined 
  • Grouped 
  • Suppressed 

ThenIn addition, we propose to overload all stateless methods that do not have one of the existing control classes listed above to accept a NamedOperation implementation.

For that we will added a new default class Named implementing NamedOperation :

add a new static method with the following signature to each of those class as(final String processorName).

Deprecration

In order to fix some inconsistencies in API, we also propose to deprecate the method named(String) from the class Joined in favor of new method as().

In addition, as no configuration classes expose the processor name, we will also deprecate the method name() from the class Joined . This method should be removed in a future release.

Overloaded methods


Then, we propose to overload all stateless methods that do not have one of the existing control classes listed above to accept a NamedOperation implementation.

For that we will added a new default class Named implementing NamedOperation :


Code Block
languagejava

public class Named implements 
Code Block
languagejava

public class Named implements NamedOperation<Named> {

    private static final int MAX_NAME_LENGTH = 249;

    protected String name;

    protected Named(final String name) {
        this.name = name;
        if (name != null)
            validate(name);
    }

    /**
     * Create a Named instance with provided name.
     *
     * @param name  the processor name to be used. If {@code null} a default processor name will be generated.
     * @return      A new {@link Named} instance configured with name
     */
    public static Named as(final String name) {
        Objects.requireNonNull(name, "name can't be null");
        return new Named(name);
    }

    @Override
    public String name() {
        return name;
    }

    @Override
    public Named withName(final String name) {
        Objects.requireNonNull(name, "name can't be null");
        return new Named(name);
    }
...
}

The below tables will resume all new and existing method :

KStream

 Named} instance configured with name
     */
    public static Named as(final String name) {
        Objects.requireNonNull(name, "name can't be null");
        return new Named(name);
    }

    @Override
    public Named withName(final String name) {
        Objects.requireNonNull(name, "name can't be null");
        return new Named(name);
    }
...
}


The below tables will resume all new and existing methods :

KStream (16 new methods)

methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
filter(Predicate, Named)YESstatic Named#as(String)N/AN/A
filterNot(Predicate, Named)YESstatic Named#as(String)N/AN/A
selectKey(KeyValueMapper, Named)YESstatic Named#as(String)N/AN/A
map(KeyValueMapper, Named)YESstatic Named#as(String)N/AN/A
mapValues(ValueMapper, Named)YESstatic Named#as(String)N/AN/A
mapValues(ValueMapperWithKey, Named)YESstatic Named#as(String)N/AN/A
flatMap(KeyValueMapper, Named)YESstatic Named#as(String)N/AN/A
flatMapValues(ValueMapper, Named)YESstatic Named#as(String)N/AN/A
flatMapValues(ValueMapperWithKey, Named)YESstatic Named#as(String)N/AN/A
print(Printed)NOstatic Printed#as(String)N/AN/A
foreach(ForeachAction, Named)YESstatic Named#as(String)N/AN/A
peek(ForeachAction, Named)YESstatic Named#as(String)N/AN/A
branch(Named, Predicate...)YESstatic Named#as(String)N/AN/A
through(String, Produced)NOstatic Produced#as(String)N/AN/A
to(String, Produced)NOstatic Produced#as(String)N/AN/A
to(TopicNameExtractor, Produced)NOstatic Produced#as(String)N/AN/A
transform(TransformerSupplier, Named, String... )YESstatic Named#as(String)N/AN/A
transformValues(ValueTransformerSupplier, Named, String...)YESstatic Named#as(String)N/AN/A
transformValues( ValueTransformerWithKeySupplier, Named, String...)YESstatic Named#as(String)N/AN/A
process(ProcessorSupplier, Named, String...)YESstatic Named#as(String)N/AN/A
join( KStream, ValueJoiner, JoinWindows windows, Joined)NOstatic Joined#as(final String name)static Joined#as(final String name)static Joined#as(final String name)
leftJoin(KStream, ValueJoiner, JoinWindows, Joined)NOstatic Joined#as(final String name)static Joined#as(final String name)static Joined#as(final String name)
outerJoin(KStream, ValueJoiner, JoinWindows, Joined)NOstatic Joined#as(final String name)static Joined#as(final String name)static Joined#as(final String name)
join(KTable, ValueJoiner, Joined)NOstatic Joined#as(final String name)static Joined#as(final String name)N/A
leftJoin(KTable, ValueJoiner, Joined)NOstatic Joined#as(final String name)static Joined#as(final String name)N/A
join(GlobalKTbale, KeyValueMapper, ValueJoiner
methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?filter(Predicate
, Named)YESstatic Named#as(String)
N/AN/A
filterNot(Predicate
leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner, Named)YESstatic Named#as(String)
N/AN/A
selectKey(KeyValueMapper, Named
flatTransform(TransformerSupplier, Named named, String... stateStoreNames)YESstatic Named#as(String)N/AN/A
map
flatTransformValues(
KeyValueMapper
ValueTransformerWithKeySupplier, Named,  String... )YESstatic Named#as(String)N/AN/A
mapValues
flatTransformValues(
ValueMapper
ValueTransformerSupplier, Named, String...)YESstatic Named#as(String)N/AN/A
mapValues(ValueMapperWithKey, Named


KTable (16 new methods)


methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
filter(Predicate
YESstatic Named#as(String)N/AN/AflatMap(KeyValueMapper
, Named)YESstatic Named#as(String)N/A
N/AflatMapValues(ValueMapper, Named
(PREFIX + COUNT)
filter(Predicate, Named, Materialized)YESstatic Named#as(String)N/A
N/AflatMapValues(ValueMapperWithKey
Materialized#as(String)
filterNot(Predicate, Named)YESstatic Named#as(String)N/A
N/Aprint(Printed)NOstatic Printed#as
(PREFIX + COUNT)
filterNot(Predicate, Named, Materialized)YESstatic Named#as(String)N/A
N/Aforeach(ForeachAction
Materialized#as(String)
mapValues(ValueMapper, Named)YESstatic Named#as(String)N/A
N/Apeek(ForeachAction
(PREFIX + COUNT)
mapValues(ValueMapperWithKey, Named)YESstatic Named#as(String)N/A
N/Abranch(Named, Predicate...
(PREFIX + COUNT)
mapValues(ValueMapper, Named, Materialized)YESstatic Named#as(String)N/A
N/A
static Materialized#as(String)
mapValues(ValueMapperWithKey, Named, Materialized);YESstatic Named#as(String)N/A
N/Athrough(String, Produced
static Materialized#as(String)
suppress(Suppressed)NO
static Produced#as
Suppressed#withName(String)N/AN/A
to
transformValues(
String, Produced
ValueTransformerWithKeySupplier, Named, String...)
NO
YESstatic
Produced#as
Named#as(String)N/A
N/Ato(TopicNameExtractor, Produced)NO
(PREFIX + COUNT)
transformValues(ValueTransformerWithKeySupplier, Materialized, Named, String...)YESstatic Named#as
static Produced#as
(String)N/A
N/Atransform(TransformerSupplier, Named named, String... )YESstatic Named#as(String)
static Materialized#as(String)
groupBy(KeyValueMapper, KeyValue, Grouped)NOstatic Grouped#as(String)static Grouped#as(String)
N/A
N/A
transformValues
join(
ValueTransformerSupplier
KTable, ValueJoiner, Named
named, String...
)YESstatic Named#as(String)N/A
N/AtransformValues( ValueTransformerWithKeySupplier, Named named, String...
(PREFIX + COUNT)
join(KTable, ValueJoiner, Named, Materialized)YESstatic Named#as(String)N/A
N/Aprocess(ProcessorSupplier, Named named, String...)
static Materialized#as(String)
leftJoin(KTable, ValueJoiner, Named);YESstatic Named#as(String)N/A
N/Ajoin( KStream
(PREFIX + COUNT)
leftJoin(KTable, ValueJoiner,
JoinWindows windows
Named,
Joined
Materialized)
NO
YESstatic
Joined#named
Named#as(
final
String
name
)
static Joined#named(final String name)
N/Astatic
Joined#named
Materialized#as(
final
String
name
)
leftJoin
outerJoin(
KStream
KTable, ValueJoiner
, JoinWindows, Joined)NOstatic Joined#named(final String name)static Joined#named(final String name)
, Named);YESstatic Named#as(String)N/A(PREFIX + COUNT
static Joined#named(final String name
)
outerJoin(
KStream
KTable, ValueJoiner,
JoinWindows
Named,
Joined
Materialized)
NO
YESstatic
Joined#named
Named#as(
final
String
name
)N/Astatic
Joined#named
Materialized#as(
final
String
name)
)
toStream(Named)YESstatic
Joined#named
Named#as(
final
String
name
)
join(KTable, ValueJoiner, Joined)NOstatic Joined#named(final String name)
N/AN/A
toStream(KeyValueMapper, Named)YESstatic Named#as(String)N/A
static Joined#named(final String name)
N/A
leftJoin(KTable, ValueJoiner, Joined)NOstatic Joined#named(final String name)static Joined#named(final String name


KGroupedStream (6 new methods)


methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
count(Named)YESstatic Named#as(String)N/A
join(GlobalKTable, KeyValueMapper, ValueJoiner)NO?????????leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)NO?????????

KTable

(PREFIX + COUNT)
count(Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
reduce(Reducer
methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?filter(Predicate
, Named)YESstatic Named#as(String)N/A
static Named#as
(
String
PREFIX + COUNT)
filter
reduce(
Predicate
Reducer, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
filterNot
aggregate(
Predicate
Initializer, Aggregator, Named)YESstatic Named#as(String)N/A
static Named#as
(
String
PREFIX + COUNT)
filterNot
aggregate(
Predicate
Initializer, Aggregator, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
mapValues(ValueMapper, Named


KGroupedTable (6 new methods)


methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
count(
YESstatic Named#as(String)N/Astatic Named#as(String)mapValues(ValueMapperWithKey,
Named)YESstatic Named#as(String)N/A
static Named#as
(
String
PREFIX + COUNT)
mapValues
count(
ValueMapper,
Named
named
, Materialized)YESstatic Named#as(String)N/A
static
Materialized#as(String)
mapValues
reduce(
ValueMapperWithKey
Reducer, Reducer, Named
named, materialized
)
;
YESstatic Named#as(String)N/A
static Materialized#as
(
String
PREFIX + COUNT)
suppress(Suppressed
reduce(Reducer, Reducer, Named, Materialized)
NO
YES
Suppressed#withName
static Named#as(String)N/A
N/AtransformValues(ValueTransformerWithKeySupplier, Named, String...
Materialized#as(String)
aggregate(Initializer, Aggregator, Aggregator, Named)YESstatic Named#as(String)N/A
static Named#as
(
String
PREFIX + COUNT)
transformValues
aggregate(
ValueTransformerWithKeySupplier
Initializer, Aggregator,
Materialized
Aggregator, Named
named
,
String...
Materialized)YESstatic Named#as(String)N/A
static
Materialized#as(String)
groupBy(KeyValueMapper, KeyValue, Grouped)NOstatic Grouped#as(String)static Grouped#as(String)N/A


TimeWindowedKStream (6 new methods)

methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
count(
join(KTable, ValueJoiner,
Named)YESstatic Named#as(String)N/A
static Named#as
(
String
PREFIX + COUNT)
join
count(
KTable, ValueJoiner,
Named
named
, Materialized)YESstatic Named#as(String)N/A
static
Materialized#as(String)
leftJoin
aggregate(
KTable
Initializer,
ValueJoiner
Aggregator, Named)
;
YESstatic Named#as(String)N/A
static Named#as
(
String
PREFIX + COUNT)
leftJoin
aggregate(
KTable
Initializer,
ValueJoiner
Aggregator, Named
named
, Materialized)YESstatic Named#as(String)N/A
static
Materialized#as(String)
outerJoin
reduce(
KTable
Reducer,
ValueJoiner,
Named)
;
YESstatic Named#as(String)N/A
static Named#as
(
String
PREFIX + COUNT)
outerJoin
reduce(
KTable, ValueJoiner
Reducer, Named
named
, Materialized)YESstatic Named#as(String)N/A
static
Materialized#as(String)

KGroupedStream

WIP

KGroupedTable

WIP

TimeWindowedKStream


SessionWindowedKStream (6 new methods)

methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?
count(Named)YESstatic Named#as(String)N/A
static Named#as
(
String
PREFIX + COUNT)
count
(Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)aggregate(Initializer, Aggregator, Named
(Named, Materialized)YESstatic Named#as(String)N/A
static Named#as
Materialized#as(String)
aggregate(Initializer, Aggregator, Merger, Named
, Materialized
)YESstatic Named#as(String)N/A
Materialized#as
(
String
PREFIX + COUNT)
reduce(Reducer
aggregate(Initializer, Aggregator, Merger, Named, Materialized)YESstatic Named#as(String)N/A
static Named#as
Materialized#as(String)
reduce(Reducer, Named
, Materialized
)YESstatic Named#as(String)N
/AMaterialized#as(String)

SessionWindowedKStream

/A(PREFIX + COUNT)
reduce(Reducer, Named, Materialized
methodAdded for this KIP ?Object/method used for node nameUsed for repartition topic nameUsed for state store name ?count(Named
)YESstatic Named#as(String)N/A
static Named#as(String)count(Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)aggregate(Initializer, Aggregator, Merger, Named)YESstatic Named#as(String)N/Astatic Named#as(String)aggregate(Initializer, Aggregator, Merger, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)reduce(Reducer, Named)YESstatic Named#as(String)N/Astatic Named#as(String)reduce(Reducer, Named, Materialized)YESstatic Named#as(String)N/AMaterialized#as(String)
Materialized#as(String)


At the end, we can summarize the scope of each configuration class as follow : 



GeneratedNamedJoined / Grouped / Produced / ConsumedMaterialized
Node NameXXX
Repartition TopicX
X
Queryable Store


X
State storeX
XX
Changelog TopicX
XX


Materialized


The main reason why we propose to overload each method accepting a Materialized argument is to not introduce ambitguity by conflating config objects that configure an operation (like Grouped, Joined) with config objects that configure an aspect of the operation (like Materialized).

Name Validation

User provided node name should follow the same restrictions that ones currently apply to state stores during the create of Materialized instance.

Currently, the Materialized class relies on the static method Topic#validate. This method ensure that a provided name only contains legal characters [a-zA-Z0-9._-] and have a maximum length of 249.


We propose to copy methods from Topic#validate into Named. This new method will be used validate both store names and node names. The benefit is to remove a dependency with the core module.

In addition, the Materialized class will throw a TopologyException while building the topology in case of a unvalid name instead of InvalidTopicException Note that for all methods accepting a Materialized argument, if no state store named is provided then the node named will be used to generate a one. The state store name will be the node name suffixed with "-table" .


Proposed Changes

  • Implement the new interface NamedOperation and default class Named 
  • Update all parameter class to implement NamedOperation : Produced Consumed Printed Joined Grouped Suppressed
  • Overload methods stateless for classes KStreams, KTables, KGroupedStream, KGroupedTable, TimeWindowedKStream, TimeWindowedKTable
  • The processor names specified by developer will be used in place of the static processor prefix. Statics prefixes will still be used if no custom processor name are specified.
  • Processor names should follow the same restrictions as the topic names. So legal characters are [a-zA-Z0-9._-] and the maximum length us of 249.


Below is an application example : 

Code Block
languagejava
final StreamsBuilder builder = new StreamsBuilder();


builder.stream("topic-input", Consumed.as("STREAM-FROM-TOPIC-INPUT")
        .filter((k, v) -> true ), Named.as("FILTER-NULL-VALUE")
        .map((k, v) -> KeyValue.pair(k, v.toUpperCase()), Named.as("MAP-TO-UPPERCASE")
        .to("topic-output", Produced.as("TO-OUTPUT-TOPIC"));

System.out.println(builder.build().describe());
---- (output)----
Topologies:
   Sub-topology: 0
    Source: STREAM-FROM-TOPIC-INPUT (topics: [topic-input])
      --> FILTER-NULL-VALUE
    Processor: FILTER-NULL-VALUE (stores: [])
      --> MAP-TO-UPPERCASE
      <-- STREAM-FROM-TOPIC-INPUT
    Processor: MAP-TO-UPPERCASE (stores: [])
      --> TO-OUTPUT-TOPIC
      <-- FILTER-NULL-VALUE
    Sink: TO-OUTPUT-TOPIC (topic: topic-output)
      <-- MAP-TO-UPPERCASE


A straightforward first pass is GitHub PR 6958

Compatibility, Deprecation, and Migration Plan

...

  1. The first proposition was to overload all stateless methods to accept an instance of Described class. However this solution was resulting in modiying a large percentage of the existing KStream and KTable methodsadd new methods KStreams#as(Described) and KTable#as(Described) while Described class would be used to customized the named of operation defined previously in the stream. However not only this new method was not conservative with the existing APIs but it also introduce some complexities for methods returning Void.
  2. The second proposition was to add new methods KStreams#as(Described) and KTable#as(Described) while Described class would be used to customized the named of operation defined previously in the stream. However not only this new method was not conservative with the existing APIs but it also introduce some complexities for methods returning Voidenrich all actions classes (Reducer, Predicate, etc) with a new default method "as(String)" in order to name the operation. But this leads to mix different classes with different semantics (Predicate vs Consumed/Produced) creating a couple of unfortunate side-effects:.