Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

 

  • Key access to ValueTransformer:  While transforming values via KStream.transformValues and ValueTransformer, the key associated with the value may be needed, even if it is not changed. For instance, it may be used to access stores.

    As of now, the key is not available within these methods and interfaces, leading to the use of KStream.transform and Transformer, and the unnecessary creation of new KeyValue objects.

  • Key access to ValueMapperValueMapper should have read-only access to the key for the value it is mapping. Sometimes the value transformation will depend on the key.

    It is possible to do this with a full blown KeyValueMapper but that loses the promise that you won't change the key – so you might introduce a re-keying phase that is totally unnecessary.

  • Key access to ValueJoiner interface: In working with Kafka Stream joining, it's sometimes the case that a join key is not actually present in the values of the joins themselves (if, for example, a previous transform generated an ephemeral join key.) In such cases, the actual key of the join is not available in the ValueJoiner implementation to be used to construct the final joined value. This can be worked around by explicitly threading the join key into the value if needed, but it seems like extending the interface to pass the join key along as well would be helpful

 

Public Changes

The following methods will be added to KStream interface

 

 
<V1> KStream<K, V1> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends V1> mapper)
<VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super V, ? extends Iterable<? extends VR>> processor);

<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
 final ValueJoinerWithKey<? super V, ? super VO, ? extends VR> joiner,
  final JoinWindows windows,
 final Serde<K> keySerde,
final Serde<V> thisValueSerde,
final Serde<VO> otherValueSerde);
<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
final ValueJoinerWithKey<? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows,
final Serde<K> keySerde,
final Serde<V> thisValSerde,
final Serde<VO> otherValueSerde);

<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
final ValueJoinerWithKey<? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows,
final Serde<K> keySerde,
final Serde<V> thisValueSerde,
final Serde<VO> otherValueSerde);
 

 

 

 

 

Proposed Changes

 

Proposed changes

public interface ValueMapperWithKey<K, V, VR> {
 VR apply(final K key, final V value);
}

public interface ValueJoinerWithKey<K, V1, V2, VR> {
 VR apply(final K key, final V1 value1, final V2 value2);
}

public interface ValueTransformerWithKey<K, V, VR> {
 void init(final ProcessorContext context);
 VR transform(final K key, final V value);
 VR punctuate(final long timestamp);
 void close();
}

Deprecate:

 void init(final ProcessorContext context); 

 void close();

both from ValueTransformerWithKey and ValueTransformer

as we already introduced rich functions with above methods.

public interface RichFunction {
 void init(final ProcessorContext context);
void close();
}

public abstract class AbstractRichFunction<VR> implements RichFunction {
@Override
  public void init(final ProcessorContext context) {}


@Override
  public void close() {}

}

public abstract class RichValueMapper<K, V, VR> extends AbstractRichFunction<VR> implements ValueMapperWithKey<K, V, VR> {
}

the same interfaces applies to ValueJoiner, ValueTransformer, Aggregator and KeyValueMapper

 
 
Having introduced the new interface and classes, we now show the sample code snippet how these are used when building topology and inside Processor. We show only for ValueMapper but the same semantics apply for others as well.
 
KStreamImpl class, handling ValueMapperWithKey interface:

@Override
public <V1> KStream<K, V1> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends V1> mapper) {

  Objects.requireNonNull(mapper, "mapper can't be null");
  String name = topology.newName(MAPVALUES_NAME);
  RichValueMapper<? super K, ? super V, ? extends V1> richValueMapper;
  if (mapper instanceof RichValueMapper) {

richValueMapper        richValueMapper = new RichValueMapper<K, V, V1>() {
          @Override
          public void init(final ProcessorContext context) {

            super.init(context);
            ((RichValueMapper<? super K, ? super V, ? extends V1>) mapper).init(context);
          }

          @Override
          public V1 apply(K key, V value) {

            return mapper.apply(value);
          }


          @Override
          public void close() {

super             super.close();
            ((RichValueMapper<? super K, ? super V, ? extends V1>) mapper).close();
          }

       };
 } else {

    richValueMapper = new RichValueMapper<K, V, V1>() {
      @Override
      public V1 apply(K key, V value) {

return          return mapper.apply(key, value);
      }

    };
  }

  topology.addProcessor(name, new KStreamMapValues<>(richValueMapper), this.name);
  return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired);
}

 

 

KStreamImpl class, handling ValueMapper interface:

@Override
public <V1> KStream<K, V1> mapValues(ValueMapper<? super V, ? extends V1> mapper) {

Objects.requireNonNull(mapper, "mapper can't be null");
  String name = topology.newName(MAPVALUES_NAME);
  RichValueMapper<? super K, ? super V, ? extends V1> richValueMapper = new RichValueMapper<K, V, V1>() {

@Override
  public V1 apply(K key, V value) {

return mapper.apply(value);
  }

};
  topology.addProcessor(name, new KStreamMapValues<>(richValueMapper), this.name);
return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired);
}

 
 
 

class KStreamMapValues<K, V, V1> implements ProcessorSupplier<K, V> {

private final RichValueMapper<K, V, V1> mapper;

public KStreamMapValues(RichValueMapper<K, V, V1> mapper) {

this.mapper = mapper;
 }


@Override
 public Processor<K, V> get() {

return new KStreamMapProcessor();
 }


private class KStreamMapProcessor extends AbstractProcessor<K, V> {

@Override
 public void init(ProcessorContext context) {

super.init(context);
 mapper.init(context);
 }



@Override
 public void process(final K key, final V value) {

V1 newValue = mapper.apply(key,value);
 context().forward(key, newValue);
 }


@Override
 public void close() {

super.close();
 mapper.close();
 }


}
}

 

 

 

 

 

 

 

 

 

 

 

The PR can be found here.

Test Plan

The unit tests are changed accordingly to support the changes in core classes.

 

Rejected Alternatives

 

  • Lambdas are not supported


This document is proposed with ValueMapper example but it can be applied to other interfaces as well. Rich functions are proposed:
public interface RichFunction {
void init(final ProcessorContext context);

void close();
}
public abstract class AbstractRichFunction implements RichFunction {
@Override
  public void init(final ProcessorContext context) {}

@Override
  public void close() {}
}

 

public abstract class RichValueJoiner<K, V1, V2, VR> extends AbstractRichFunction implements ValueJoiner<V1, V2, VR>  {
@Override
  public final VR apply(final V1 value1, final V2 value2) {
return apply(null, value1, value2);
  }

public abstract VR apply(final K key, final V1 value1, final V2 value2);
}

 

Inside processor, we check if the instance (for example ValueMapper instance) is rich (for example RichValueMapper):

 
KStreamFlatMapValues(ValueMapper<? super V, ? extends Iterable<? extends V1>> mapper) {
this.mapper = mapper;
  isRichFunction = mapper instanceof RichValueMapper ? true : false;
}
@Override
public void process(K key, V value) {
Iterable<? extends V1> newValues;
if (isRichFunction) {
newValues = ((RichValueMapper<? super K, ? super V, ? extends Iterable<? extends V1>>) mapper).apply(key, value);
  } else {
newValues = mapper.apply(value);
 }
for (V1 v : newValues) {
context().forward(key, v);
  }
}

 


  • Not backward-compatible

 

We propose adding key information for ValueJoiner,  ValueTransformer, and ValueMapper classes and their apply(...) methods.

As a result, we perform the following public changes (and their overloaded versions)

ClassOldNew
KStream<VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper); <VR> KStream<K, VR> mapValues(ValueMapper<? super K, ? super V, ? extends VR> mapper);
KStream<VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier, final String... stateStoreNames);<VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,final String... stateStoreNames);
KStream

<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,

 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,

final JoinWindows windows);

<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,

final ValueJoiner<? super K, ? super V, ? super VO, ? extends VR> joiner,

final JoinWindows windows);

KTable

<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper);

<VR> KTable<K, VR> mapValues(final ValueMapper<? super K, ? super V, ? extends VR> mapper);
KTable

<VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);

<VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final ValueJoiner<? super K, ? super V, ? super VO, ? extends VR> joiner);

 

 

  • Lacking performance because deep-copy and need for RichFunctions

 

  1. We extend the target interfaces ValueJoiner,  ValueTransformer, and ValueMapper as ValueJoinerWithKey,  ValueTransformerWithKey, and ValueMapperWithKey. In extended abstract classes we have an access to keys.
  2. In Processor we check the actual instance of object:
     
    this.valueTransformer = valueTransformer;
    if (valueTransformer instanceof ValueTransformerWithKey) {
    isTransformerWithKey = true;
    } else {
    isTransformerWithKey = false;
    }

    ..............

    ..............

    @Override
    public void process(K key, V value) {
    if (isTransformerWithKey) {
    K keyCopy = (K) Utils.deepCopy(key);
      context.forward(key, ((ValueTransformerWithKey<K, V, R>) valueTransformer).transform(keyCopy, value));
      } else {
    context.forward(key, valueTransformer.transform(value));
      }
    }

  3. As we can see from the above code snippet, we can guard the key change in Processors by deeply copying the object before calling the apply() method.

...