Status
Current state: Under Discussion
Discussion thread: here
JIRA: KAFKA-10578
PR: PR-9381
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Imagine that we have an entity for which we want to emit the difference between the current and the previous state. The simplest case would be that the entity was an integer number and you want to emit the subtraction between the current and previous values.
For example, for the input stream 4, 6, 3 the output 4 (4 - 0), 2 (6 - 4), -3 (3 - 6) is expected.
The way to achieve this with Kafka Streams would be with something like this:
public class DiffAggregate { private final int currentValue; private final int lastDifference; public DiffAggregate() { this(0, 0); } public DiffAggregate(int currentValue, int lastDifference) { this.currentValue = currentValue; this.lastDifference = lastDifference; } public int getLastDifference() { return this.lastDifference; } public DiffAggregate next(int newValue) { return new DiffAggregate(newValue, newValue - currentValue); } } ... stream.groupByKey() .aggregate(DiffAggregate::new, (key, value, aggregate) -> aggregate.next(value)) .mapValues(DiffAggregate::getLastDifference);
The main problem, apart from needing more code, is that if the same event is received twice at the same time and the commit time is not 0, the difference is deleted and nothing is emitted.
Public Interfaces
public interface KeyValueWithPreviousMapper<K, V, VR> { VR apply(final K key, final V previousValue, final V value); } public interface KTable<K, V> { ... <VR> KStream<K, VR> toStream(final KeyValueWithPreviousMapper<? super K, ? super V, ? extends VR> mapper); <VR> KStream<K, VR> toStream(final KeyValueWithPreviousMapper<? super K, ? super V, ? extends VR> mapper, final Named named); }
Proposed Changes
KeyValueWithPreviousMapper is an interface for mapping a key-value pair with the previous value to a new value of arbitrary type.
A KTable can be converted to a KStream using the given KeyValueWithPreviousMapper to compute the new value of the output record.
The previous problem would be solved in one line without loose any value:
stream.toTable() .toStream((key, previousValue, newValue) -> newValue - (previousValue != null ? previousValue : 0));
The proposal can be seen in https://github.com/apache/kafka/pull/9381
Compatibility, Deprecation, and Migration Plan
The proposal is backward-compatible because it only adds an interface and new methods and does not change any existing methods.
No methods need to be deprecated and no migration plan is required.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.