You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current stateUnder Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, while building a new Topology through the KStreams DSL the processors are automatically named.

The genarated names are prefixed depending of the operation (i.e KSTREAM-SOURCE, KSTREAM-FILTER, KSTREAM-MAP, etc).

To debug/understand a topology it is possible to display the processor lineage with the method Topology#describe(). However, a complex topology with dozens of operations can be hard to understand if the processor names are not relevant.

It would be useful to be able to set more meaningful names. For example, a processor name could describe the business rule performed by a map() operation.

Public Interfaces

First, we propose to add one new class Processed that can be used to provide the optional parameters when processing each record (i.e to set the processor name). This class will be use by stateless operations (ie. map(), filter(), etc).

package org.apache.kafka.streams.kstream;

import java.util.Objects;

/**
 * This class is used to provide the optional parameters when processing each record.
 */
public class Processed {

    protected final String processorName;

    protected Processed(final String processorName) {
        this.processorName = processorName;
    }

    protected Processed(final Processed processed) {
        this(processed.getProcessorName());
    }

    /**
     * Create a Processed instance with provided processor name.
     * @param processorName      name to use to describe the {@link org.apache.kafka.streams.processor.Processor}.
     * @return  A new {@link Processed} instance configured with processorName
     */
    public static Processed with(final String processorName) {
        return new Processed(processorName);
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        Processed processed = (Processed) o;
        return Objects.equals(processorName, processed.processorName);
    }

    @Override
    public int hashCode() {
        return Objects.hash(processorName);
    }

    public String getProcessorName() {
        return processorName;
    }
}


Then we propose to add new methods to the existing classes : Consumed, Produced, Joined and Printed in order to be able to define custom processor names.

Example for the Consumed class : 

/**
 * Create an instance of {@link Consumed} with provided processor name.
 *
 * @param processorName the processor name to be used. If {@code null} a default processor name will be generated
 * @param <K>         key type
 * @param <V>         value type
 * @return a new instance of {@link Consumed}
 */
public static <K, V> Consumed<K, V> with(final String processorName) {
    return new Consumed<>(null, null, null, null, processorName);
}

/**
 * Configure the instance of {@link Consumed} with a key {@link Serde}.
 *
 * @param processorName the processor name to be used. If {@code null} a default processor name will be generated
 * @return this
 */
public Consumed<K, V> withProcessorName(final String processorName) {
    this.processorName = processorName;
    return this;
}


Proposed Changes

  • Implement the new class Processed and update the KStream/KStreamImpl methods that add stateless processor (i.e map(), mapValue(), filter()...).
  • Update the Consumed, Produced and Joined classes to be able to set a processor name.
  • The processor names specified by developer will be used in place of the static processor prefix. Statics prefixes will still be used if no custom processor name are specified.


Below is an application example : 

final StreamsBuilder builder = new StreamsBuilder();

builder.<String, String>stream("topic-input", Consumed.with("CONSUME-FROM-INPUT-TOPIC"))
        .filter( (k, v) -> true,
                Processed.with("FILTER-NULL-VALUE"))
        .map( (k, v) -> KeyValue.pair(k, v.toUpperCase()),
                Processed.with("MAP-TO-UPPERCASE"))
        .to("topic-output",
                Produced.with("PRODUCE-TO-OUTPUT-TOPIC"));

System.out.println(builder.build().describe().toString());
---- (output)----
Topologies:
   Sub-topology: 0
    Source: CONSUME-FROM-INPUT-TOPIC-0000000000 (topics: [topic-input])
      --> FILTER-NULL-VALUE-0000000001
    Processor: FILTER-NULL-VALUE-0000000001 (stores: [])
      --> MAP-TO-UPPERCASE-0000000002
      <-- CONSUME-FROM-INPUT-TOPIC-0000000000
    Processor: MAP-TO-UPPERCASE-0000000002 (stores: [])
      --> PRODUCE-TO-OUTPUT-TOPIC-0000000003
      <-- FILTER-NULL-VALUE-0000000001
    Sink: PRODUCE-TO-OUTPUT-TOPIC-0000000003 (topic: topic-output)
      <-- MAP-TO-UPPERCASE-0000000002


Compatibility, Deprecation, and Migration Plan

No compatibility issues foreseen.

Rejected Alternatives

No rejected alternatives

  • No labels