Status

Current stateUnder Discussion

Discussion thread: here

JIRA: KAFKA-10578 

PR: PR-9381

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Imagine that we have an entity for which we want to emit the difference between the current and the previous state. The simplest case would be that the entity was an integer number and you want to emit the subtraction between the current and previous values.

For example, for the input stream 4, 6, 3 the output 4 (4 - 0), 2 (6 - 4), -3 (3 - 6) is expected.

The way to achieve this with Kafka Streams would be with something like this:


public class DiffAggregate {
  private final int currentValue;
  private final int lastDifference;

  public DiffAggregate() {
    this(0, 0);
  }

  public DiffAggregate(int currentValue, int lastDifference) {
    this.currentValue = currentValue;
    this.lastDifference = lastDifference;
  }

  public int getLastDifference() {
    return this.lastDifference;
  }

  public DiffAggregate next(int newValue) {
    return new DiffAggregate(newValue, newValue - currentValue);
  }
}

...

stream.groupByKey()
        .aggregate(DiffAggregate::new, (key, value, aggregate) -> aggregate.next(value))
        .mapValues(DiffAggregate::getLastDifference);


The main problem, apart from needing more code, is that if the same event is received twice at the same time and the commit time is not 0, the difference is deleted and nothing is emitted.

Public Interfaces


public interface KeyValueWithPreviousMapper<K, V, VR> {

    VR apply(final K key, final V previousValue, final V value);
}

public interface KTable<K, V> {

	...

	<VR> KStream<K, VR> toStream(final KeyValueWithPreviousMapper<? super K, ? super V, ? extends VR> mapper);

	<VR> KStream<K, VR> toStream(final KeyValueWithPreviousMapper<? super K, ? super V, ? extends VR> mapper, final Named named);
}


Proposed Changes

KeyValueWithPreviousMapper is an interface for mapping a key-value pair with the previous value to a new value of arbitrary type.

A KTable can be converted to a KStream using the given KeyValueWithPreviousMapper to compute the new value of the output record.

The previous problem would be solved in one line without loose any value:

stream.toTable()
		.toStream((key, previousValue, newValue) -> newValue - (previousValue != null ? previousValue : 0));


The proposal can be seen in https://github.com/apache/kafka/pull/9381

Compatibility, Deprecation, and Migration Plan

The proposal is backward-compatible because it only adds an interface and new methods and does not change any existing methods.

No methods need to be deprecated and no migration plan is required.

Rejected Alternatives


If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.


  • No labels