Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]10578 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

For example, for the input stream 4, 6, 3 the output 4 (4 - 0), 2 (6 - 4), -3 (3 - 6) is expected.

The way to achieve this with Kafka Streams would be with something like this:


Code Block
languagejava
public class DiffAggregate {
  private final int currentValue;
  private final int lastDifference;

  public DiffAggregate() {
    this(0, 0);
  }

  public DiffAggregate(int currentValue, int lastDifference) {
    this.currentValue = currentValue;
    this.lastDifference = lastDifference;
  }

  public int getLastDifference() {
    return this.lastDifference;
  }

  public DiffAggregate next(int newValue) {
    return new DiffAggregate(newValue, newValue - currentValue);
  }
}

...

stream.groupByKey()
        .aggregate(DiffAggregate::new, (key, value, aggregate) -> aggregate.next(value))
        .mapValues(DiffAggregate::getLastDifference);


The main problem, aside from being verboseapart from needing more code, is that if the same event is received twice at the same time and the commit time is not 0, the difference is deleted and nothing is emitted.

...

Compatibility, Deprecation, and Migration Plan

The proposal is backward-compatible because it only adds an interface and new methods and does not change any existing methods.

No methods need to be deprecated and no migration plan is required.

Rejected Alternatives


If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

...