...
- Key access to ValueTransformer: While transforming values via
KStream.transformValues
andValueTransformer
, the key associated with the value may be needed, even if it is not changed. For instance, it may be used to access stores.As of now, the key is not available within these methods and interfaces, leading to the use of
KStream.transform
andTransformer
, and the unnecessary creation of newKeyValue
objects. - Key access to ValueMapper:
ValueMapper
should have read-only access to the key for the value it is mapping. Sometimes the value transformation will depend on the key.It is possible to do this with a full blown
KeyValueMapper
but that loses the promise that you won't change the key – so you might introduce a re-keying phase that is totally unnecessary. - Key access to ValueJoiner interface: In working with Kafka Stream joining, it's sometimes the case that a join key is not actually present in the values of the joins themselves (if, for example, a previous transform generated an ephemeral join key.) In such cases, the actual key of the join is not available in the ValueJoiner implementation to be used to construct the final joined value. This can be worked around by explicitly threading the join key into the value if needed, but it seems like extending the interface to pass the join key along as well would be helpful
- Apart from key accesses providing the Rich versions interfaces including ValueJoiner, ValueMapper, Initializer, Aggregator is needed in various use-cases.
Public Changes
KStream
interface:
Code Block | ||
---|---|---|
| ||
<GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable, final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper, final ValueJoinerWithKey<? super K, ? super V, ? super GV, ? extends RV> valueJoinerWithKey); <GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable, final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper, final ValueJoinerWithKey<? super K, ? super V, ? super GV, ? extends RV> valueJoinerWithKey); <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table, final ValueJoinerWithKey<? super K, ? super V, ? super VT, ? extends VR> valueJoinerWithKey, final Serde<K> keySerde, final Serde<V> valSerde); <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table, final ValueJoinerWithKey<? super K, ? super V, ? super VT, ? extends VR> valueJoinerWithKey); <VT, VR> KStream<K, VR> join(final KTable<K, VT> table, final ValueJoinerWithKey<? super K, ? super V, ? super VT, ? extends VR> valueJoinerWithKey, final Serde<K> keySerde, final Serde<V> valSerde); <VT, VR> KStream<K, VR> join(final KTable<K, VT> table, final ValueJoinerWithKey<? super K, ? super V, ? super VT, ? extends VR> valueJoinerWithKey); <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream, final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> valueJoinerWithKey, final JoinWindows windows, final Serde<K> keySerde, final Serde<V> thisValueSerde, final Serde<VO> otherValueSerde); <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream, final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> valueJoinerWithKey, final JoinWindows windows); <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> valueJoinerWithKey, final JoinWindows windows, final Serde<K> keySerde, final Serde<V> thisValSerde, final Serde<VO> otherValueSerde); <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> valueJoinerWithKey, final JoinWindows windows); <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream, final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> valueJoinerWithKey, final JoinWindows windows, final Serde<K> keySerde, final Serde<V> thisValueSerde, final Serde<VO> otherValueSerde); <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream, final ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> valueJoinerWithKey, final JoinWindows windows); <VR> KStream<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerWithKeySupplier, final String... stateStoreNames); <VR> KStream<K, VR> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> mapperWithKey); |
Proposed Changes
Handling lambdas
For Initializer ValueMapper, Aggregator, ValueMapper, ValueJoiner and their "withKey"
interfaces we support lambdas. For ValueTransformer
interface we don't need lambdas by the core definition of the class.
For all above interfaces we support rich functions.
To support lambdas, we separate withKey interface from original ones, meaning we don't inherit or extend from one to another.
RichValueMapperValueMapperWithKey
Code Block | ||
---|---|---|
| ||
public interface RichValueMapper<K, V, VR> implements ValueMapperWithKey<K, V, VR>, RichFunction { } public interface ValueMapperWithKey<K, V, VR> { VR VR apply(final K key, final V value); } |
ValueJoinerWithKey
Code Block | ||
---|---|---|
| ||
public interface RichFunction ValueJoinerWithKey<K, V1, V2, VR> { voidVR init(); void close(apply(final K key, final V1 value1, final V2 value2); } |
ValueTransformerWithKeySupplier
RichValueJoiner
Code Block | ||
---|---|---|
| ||
public interface RichValueJoiner<KValueTransformerWithKeySupplier<K, V1V, V2,VR> VR>{ extends ValueJoinerWithKey<K, V1ValueTransformerWithKey<K, V2V, VR>, RichFunction { } public interface ValueJoinerWithKey<K, V1, V2, VR> { VR apply(final K key, final V1 value1, final V2 value2 get(); } |
RichInitializer
Code Block | ||
---|---|---|
| ||
public interface RichInitializer<VA> extends Initializer<VA>, RichFunction {
}
|
Handling withKey interfaces while building the topology
In general, we change the constructors of all related backend Processors to be withKey types as we can easily convert regular (withoutKey) interfaces to withKey interfaces.
ValueMapperWithKeyRichAggregator
Code Block | ||
---|---|---|
| ||
@Override public interface<V1> RichAggregator<KKStream<K, V, VA> extends Aggregator<K, V, VA>, RichFunction { } |
ValueTransformerWithKeySupplier
Code Block | ||
---|---|---|
| ||
public interface ValueTransformerWithKeySupplier<K, V, VR> {
ValueTransformerWithKey<K, V, VR> get();
} |
Handling rich functions while building the topology
In general, we change the constructors of all related backend Processors to be Rich types as we can easily convert lambdas/withKey-lambdas to Rich functions.
RichValueMapper
Code Block | ||
---|---|---|
| ||
@Override
public <V1> KStream<K, V1> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends V1> mapperWithKey) {
Objects.requireNonNull(mapperWithKey, "mapperWithKey can't be null");
String name = topology.newName(MAPVALUES_NAME);
final RichValueMapper<K, V, V1> richValueMapper;
if (mapperWithKey instanceof RichValueMapper) {
richValueMapper = (RichValueMapper<K, V, V1>) mapperWithKey;
} else {
richValueMapper = new RichValueMapper<K, V, V1>() {
@Override
public void init() {}
@Override
public void close() {}
@Override
public V1 apply(K key, V value) {
return mapperWithKey.apply(key, value);
}
};
}
topology.addProcessor(name, new KStreamMapValues<>(richValueMapper), this.name);
return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired);
}
|
RichValueJoiner
Code Block | ||
---|---|---|
| ||
static <K, T1, T2, R> ValueJoinerWithKey<K, T1, T2, R> convertToValueJoinerWithKey(final ValueJoiner<T1, T2, R> valueJoiner) {
Objects.requireNonNull(valueJoiner, "valueJoiner can't be null");
return new ValueJoinerWithKey<K, T1, T2, R>() {
@Override
public R apply(K key, T1 value1, T2 value2) {
return valueJoiner.apply(value1, value2);
}
};
}
public <V1, R> KStream<K, R> leftJoin(
final KStream<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
final JoinWindows windows,
final Serde<K> keySerde,
final Serde<V> thisValSerde,
final Serde<V1> otherValueSerde) {
return doJoin(other,
convertToValueJoinerWithKey(joiner), // doJoin and join methods accept ValueJoinerWithKey type and corresponding Processors accept only Rich functions in their constructor.
windows,
keySerde,
thisValSerde,
otherValueSerde,
new KStreamImplJoin(true, false));
}
|
Converters between Rich/WithoutKey/WithKey types
Code Block | ||
---|---|---|
| ||
static <K, T1, T2, R> RichValueJoiner<K, T1, T2, R> convertToRichValueJoiner(final ValueJoinerWithKey<K, T1, T2, R> valueJoinerWithKey) { Objects.requireNonNull(valueJoinerWithKey, "valueJoiner can't be null"); if (valueJoinerWithKey instanceof RichValueJoiner) { return (RichValueJoiner<K, T1, T2, R>) valueJoinerWithKey; } else { return new RichValueJoiner<K, T1, T2, R>() { @Override public void init() {} @Override public void close() {} @Override public R apply(K key, T1 value1, T2 value2) { return valueJoinerWithKey.apply(key, value1, value2); } }; } } static <K, T1, T2, R> ValueJoinerWithKey<K, T1, T2, R> convertToValueJoinerWithKey(final ValueJoiner<T1, T2, R> valueJoiner) { Objects.requireNonNull(valueJoiner, "valueJoiner can't be null"); return new ValueJoinerWithKey<K, T1, T2, R>() { @Override public R apply(K key, T1 value1, T2 value2) { return valueJoiner.apply(value1, value2); } }; } static <VA> RichInitializer<VA> checkAndMaybeConvertToRichInitializer(final Initializer<VA> initializer) { Objects.requireNonNull(initializer, "initializer can't be null"); if (initializer instanceof RichInitializer) { return (RichInitializer<VA>) initializer; } else { return new RichInitializer<VA>(V1> mapValues(final ValueMapper< ? super V, ? extends V1> mapper) { Objects.requireNonNull(mapperWithKey, "mapperWithKey can't be null"); String name @Override= topology.newName(MAPVALUES_NAME); final ValueMapperWithKey<K, V, V1> valueMapperWithKey = new ValueMapperWithKey<K, public VA applyV, V1>() { return initializer.apply(); @Override } public V1 apply(K key, V value) { @Override publicreturn void initmapper(value) {} ; @Override} }; public void close() {}} topology.addProcessor(name, new KStreamMapValues<>(valueMapperWithKey), this.name); return new }; }KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired); } |
ValueJoinerWithKey
Code Block | ||
---|---|---|
| ||
static <K, V, VA> RichAggregator<K, V, VA> checkAndMaybeConvertToRichAggregatorT1, T2, R> ValueJoinerWithKey<K, T1, T2, R> convertToValueJoinerWithKey(final Aggregator<KValueJoiner<T1, VT2, VA>R> aggregatorvalueJoiner) { Objects.requireNonNull(aggregatorvalueJoiner, "aggregatorvalueJoiner can't be null");"); return new ValueJoinerWithKey<K, T1, T2, R>() { if (aggregator instanceof RichAggregator) {@Override returnpublic R apply(RichAggregator<KK key, VT1 value1, T2 VA>value2) aggregator;{ } else { return new RichAggregator<KvalueJoiner.apply(value1, V, VA>() {value2); } @Override }; } public <V1, R> KStream<K, R> leftJoin( final KStream<K, V1> other, public VA apply(K key,final ValueJoiner<? super V value, VA? aggregate) { super V1, ? extends R> joiner, final JoinWindows windows, final return aggregator.apply(key, value, aggregate); Serde<K> keySerde, final Serde<V> thisValSerde, final Serde<V1> otherValueSerde) }{ return doJoin(other, @Override convertToValueJoinerWithKey(joiner), // doJoin, join methods, and corresponding Processors accept ValueJoinerWithKey type. public void init() {} windows, @OverridekeySerde, thisValSerde, public void close() {} otherValueSerde, }; }new KStreamImplJoin(true, false)); } |
Test Plan
The unit tests are changed accordingly to support the changes in core classes.
Rejected Alternatives
Lambdas are not supported
ValueMapper
example but it can be applied to other interfaces as well. Rich functions are proposed:public interface RichFunction {
void init(final ProcessorContext context);
void close();
}
public abstract class AbstractRichFunction implements RichFunction {
@Override
public void init(final ProcessorContext context) {}
@Override
public void close() {}
}
public abstract class RichValueJoiner<K, V1, V2, VR> extends AbstractRichFunction implements ValueJoiner<V1, V2, VR> {
@Override
public final VR apply(final V1 value1, final V2 value2) {
return apply(null, value1, value2);
}
public abstract VR apply(final K key, final V1 value1, final V2 value2);
}
Inside processor, we check if the instance (for example ValueMapper
instance) is rich (for example RichValueMapper
):
KStreamFlatMapValues(ValueMapper<? super V, ? extends Iterable<? extends V1>> mapper) {
this.mapper = mapper;
isRichFunction = mapper instanceof RichValueMapper ? true : false;
}
@Override
public void process(K key, V value) {
Iterable<? extends V1> newValues;
if (isRichFunction) {
newValues = ((RichValueMapper<? super K, ? super V, ? extends Iterable<? extends V1>>) mapper).apply(key, value);
} else {
newValues = mapper.apply(value);
}
for (V1 v : newValues) {
context().forward(key, v);
}
}
Not backward-compatible
We propose adding key information for ValueJoiner
, ValueTransformer
, and ValueMapper
classes and their apply(...)
methods.
As a result, we perform the following public changes (and their overloaded versions)
Class | Old | New |
---|---|---|
KStream | <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper); | <VR> KStream<K, VR> mapValues(ValueMapper<? super K, ? super V, ? extends VR> mapper); |
KStream | <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier, final String... stateStoreNames); | <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,final String... stateStoreNames); |
KStream | <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows); | <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream, final ValueJoiner<? super K, ? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows); |
KTable | <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper); | <VR> KTable<K, VR> mapValues(final ValueMapper<? super K, ? super V, ? extends VR> mapper); |
KTable | <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner); | <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final ValueJoiner<? super K, ? super V, ? super VO, ? extends VR> joiner); |
Lacking performance because deep-copy and need for RichFunctions
- We extend the target interfaces
ValueJoiner
,ValueTransformer
, andValueMapper as
ValueJoinerWithKey
,ValueTransformerWithKey
, andValueMapper
WithKey. In extended abstract classes we have an access to keys. - In Processor we check the actual instance of object:
this.valueTransformer = valueTransformer;
if (valueTransformer instanceof ValueTransformerWithKey) {
isTransformerWithKey = true;
} else {
isTransformerWithKey = false;
}..............
..............
@Override
public void process(K key, V value) {
if (isTransformerWithKey) {
K keyCopy = (K) Utils.deepCopy(key);
context.forward(key, ((ValueTransformerWithKey<K, V, R>) valueTransformer).transform(keyCopy, value));
} else {
context.forward(key, valueTransformer.transform(value));
}
} - As we can see from the above code snippet, we can guard the key change in Processors by deeply copying the object before calling the
apply()
method.
...