Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, while building a new Topology through the KStreams DSL the processors are automatically named.

The genarated names are prefixed depending of the operation (i.e KSTREAM-SOURCE, KSTREAM-FILTER, KSTREAM-MAP, etc).  Moreover, they are added with a suffix like "-0000000000" to guarantee their uniqueness.

To debug/understand a topology it is possible to display the processor lineage with the method Topology#describe(). However, a complex topology with dozens of operations can be hard to understand if the processor names are not relevant.

It would be useful So allowing users to be able to set more meaningful names can help to resolve complexity of some developed topologies. For example, a processor name could describe the business rule performed by a map() operation.

In addition, the generated names have a few disadvantages to guarantee topology compatibilities. In fact, adding a new operator, using a third-library doing some optimization to remove some operators or upgrading to a new KafkaStreams version with internal API changes may changed suffix indexing for a large amount of the processor names. This will in turn change the internal state store names, as well as internal topic names as well.



Public Interfaces

First, we propose to add one new class Processed Described that can be used to provide the optional parameters when processing each record (i.e to set the processor name). This class will be use by stateless operations (ie. map(), filter(), etc) The objective to create a new class is to keep consistent with the overall API design.

Code Block
languagejava
package org.apache.kafka.streams.kstream;

import java.util.Objects;

/**
 * This class is used to provide the optional parameters when processing each record.
 */
public class ProcessedDescribed {

    protected final String processorName;

    protected ProcessedDescribed(final String processorName) {
        this.processorName = processorName;
    }

    protected ProcessedDescribed(final ProcessedDescribed processeddescribed) {
        this(processeddescribed.getProcessorName());
    }

    /**
     * Create a ProcessedDescribed instance with provided processor name.
     * @param processorName      name to use to describe the {@link org.apache.kafka.streams.processor.Processor}.
     * @return  A new {@link ProcessedDescribed} instance configured withwithName processorName
     */
    public static ProcessedDescribed withwithName(final String processorName) {
        return new ProcessedDescribed(processorName);
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        ProcessedDescribed processeddescribed = (ProcessedDescribed) o;
        return Objects.equals(processorName, processeddescribed.processorName);
    }

    @Override
    public int hashCode() {
        return Objects.hash(processorName);
    }

    public String getProcessorName() {
        return processorName;
    }
}

This class will be then used by a new method as(Processed)  added to KStream and KTable classes : 


Code Block
languagejava
/**
 * Describe the record-by-record operation previously made to obtain this stream.
 * This is not an operation. This method can be used to customized a processor name return
 *
 * <p>
 * For example you can used this method to customize the name of the last generated processors applied to this stream.
 * The given processor name will display in the description of the {@code Topology} built.
 * <pre>{@code
 * KStream<String, String> inputStream = builder.stream("topic");
 * KStream<String, String> outputStream = inputStream.map(new KeyValueMapper<String, String, KeyValue<String, String>> {
 *     KeyValue<String, String> apply(String key, String value) {
 *         return new KeyValue<>(key, value.toUpperCase());
 *     }
 * });
 * outputStream.as(Described.withName("MAP-VALUE-TO-UPPERCASE"))
 * }</pre>
 * </p>
 * @param described the options to describe the processor operation.
 * @return a {@code KStream}
 */
KStream<K, V> as(Described described);


Stateless operations

Stateless operations like stream(), map(), filter(), generate a single underlaying processor. The method depicted above will directly change the processor name.

Statefull operations

Stateful operations like table(), join() are translated to multiple processors (aka. nodes). The given name should be used to describe the first processor and as a prefix for all subsequent processors.

The method branch() results in multiple streams that can be described with the used of the new added method.

However to be able to custome the name of the first translated processor we propose to overload this method with a one accepting a Described instance in argument.

Sink operations

The Sink operations returning Void can't use the method depicted above.

Currently, KStream<K, V> interface defines the following methods : 

  • void foreach(final ForeachAction<? super K, ? super V> action); 
  • void process(final ProcessorSupplier<? super K, ? super V> processorSupplier, final String... stateStoreNames);

We propose to overload each of this method to accept a Described argument.

Thenfor those  three methods we propose to add a new methods to the existing classes : Consumed, Produced, Joined method withProcessorName() to classes Produced and Printed in order to be able to define custom processor names.minimize the number of overloaded methods.

  • void print(final Printed<K, V> printed)
  • void to(final String topic, final Produced<K, V> produced)
  • void to(final TopicNameExtractor<K, V> topicExtractor, final Produced<K, V> produced)

Example for the Consumed Produced class : 

Code Block
languagejava
/**
 * Create an instance of {@link ConsumedProduced} with provided processor name.
 *
 * @param processorName the processor name to be used. If {@code null} a default processor name will be generated
 * @param <K>         key type
 * @param <V>         value type
 * @return a new instance of {@link ConsumedProduced}
 */
public static <K, V> Consumed<KProduced<K, V> with(final String processorName) {
    return new Consumed<>(null, null, null, null, processorName);
}

/**
 * Configure the instance of {@link ConsumedProduced} with a key {@link Serde}.
 *
 * @param processorName the processor name to be used. If {@code null} a default processor name will be generated
 * @return this
 */
public Consumed<KProduced<K, V> withProcessorName(final String processorName) {
    this.processorName = processorName;
    return this;
}

Then we propose to add new methods to the existing classes : Consumed, Produced, Joined and Printed in order to be able to define custom processor names.

Proposed Changes

  • Implement the new class Processed Described and update the KStream/KStreamImpl, KTable/KTableImpl methods that add stateless processor (i.e map(), mapValue(), filter()...).
  • Update the Consumed, Produced and Joined classes to be able to set a processor name.
  • The processor names specified by developer will be used in place of the static processor prefix. Statics prefixes will still be used if no custom processor name are specified.

...

Code Block
languagejava
final StreamsBuilder builder = new StreamsBuilder();


builder.<String, String>streamKStream<String, String> stream = buildbuilderer1.stream("topic-input", Consumed.with("CONSUME-FROM-INPUT-TOPIC);
        stream.as(Described.withName("STREAM-FROM-TOPIC-INPUT"))
        .filter( (k, v) -> true,
                Processed.with ).as(Described.withName("FILTER-NULL-VALUE"))
        .map( (k, v) -> KeyValue.pair(k, v.toUpperCase()),
                Processed.with).as(Described.withName("MAP-TO-UPPERCASE"))
        .to("topic-output",
                Produced.with("PRODUCE-TO-OUTPUT-TOPIC"));

System.out.println(builder.build().describe().toString());
---- (output)----
Topologies:
   Sub-topology: 0
    Source: CONSUMESTREAM-FROM-INPUT-TOPIC-0000000000INPUT (topics: [topic-input])
      --> FILTER-NULL-VALUE-0000000001
    Processor: FILTER-NULL-VALUE-0000000001 (stores: [])
      --> MAP-TO-UPPERCASE-0000000002
      <-- CONSUMESTREAM-FROM-INPUT-TOPIC-0000000000INPUT
    Processor: MAP-TO-UPPERCASE-0000000002 (stores: [])
      --> PRODUCEKSTREAM-TO-OUTPUTSINK-TOPIC-0000000003
      <-- FILTER-NULL-VALUE-0000000001
    Sink: PRODUCEKSTREAM-TOSINK-OUTPUT-TOPIC-0000000003 (topic: topic-output)
      <-- MAP-TO-UPPERCASE-0000000002

Compatibility, Deprecation, and Migration Plan

No compatibility issues foreseen.

Rejected Alternatives

...

  1. The first proposition was to overload all stateless methods to accept an instance of Described class. However this solution was resulting in modiying a large percentage of the existing KStream and KTable methods.