Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

As for now, KStream#print leads to a predefined output where key and value are printed with comma separation:

Code Block
languagejava
"[" + this.streamName + "]: " + keyToPrint + " , " + valueToPrint

We therefore suggest to
KAFKA-4830 suggests to extend print in a way that it takes KeyValueMapper as a parameter.
This will allow a user to change outputs according to the users demand., to let the user modify the output according to its needs. If a mapper is present, the hard-coded key-value output can be replaced with:

Code Block
languagejava
"[" + this.streamName + "]: " + mapper.apply(keyToPrint, valueToPrint)

Public Interfaces

The affected interface is KStream, which needs to be extended with another overloaded version of print:

Code Block
languagejava
void print(final 

...

Serde<K> keySerde,

...


           final 

...

Serde<V> valSerde,

...


           final String streamName,

...


           final 

...

KeyValueMapper<K, V, String> mapper);

 

 

Proposed Changes

See pull request GH-2669.
This PR contains a discussion regarding KAFKA-4830 as well as KAFKA-4772.

 

Apart from the extension of the interface, the previous implementation of print will be call a new overloaded version thereof. In case no KeyValuemapper is set, null serves as the indicator to use the default output (as described above):
Code Block
languagejava
@Override
public void print(Serde<K> keySerde, Serde<V> valSerde, String streamName) {
    print(keySerde, valSerde, streamName, null);
}

@Override
public void print(final Serde<K> keySerde, final Serde<V> valSerde, String streamName, final KeyValueMapper<K, V, String> mapper){
...

 

Compatibility, Deprecation, and Migration Plan

The extension of print will not introduce compatibility issues . Further more, we can maintain the current output by keeping the current output format as a default (if mapper was not set):

Code Block
languagejava
if(mapper == null) {

...


    printStream.println("[" + streamName + "]: " + keyToPrint + " , " + valueToPrint);

...


} else {

...


    printStream.println("[" + streamName + "]: " + mapper.apply(keyToPrint, valueToPrint));
}

...