Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The first one has been present for the majority of Kafka Stream's inception, when results are forwarded regardless . The second option has been added by implementing the so-called "suppression" operator which collects results throughout the window, and extending into the grace period. However, one mode of processing should've been implemented long ago, and that is emit on change. As mentioned in the JIRA, in many cases, the user would process a prior result to yield a new one, yet, this new result is forwarded to a downstream processor even though the operation performed had been effectively a no-op. This is problematic in certain circumstances where there is a high number of such operations, leading to a an enormous number of useless results being forwarded down the topology.

...

  1. There has been some thoughts as to only coupling emit on change with the suppress operator. However, that approach has been abandoned in favor of a more extensive change which would impact almost all KTable operations supported by Kafka Streams (it might be that we will restrict emit-on-change to materialized KTables). Our justification is that no-op changes should be suppressed even before they are forwarded downstream, as the duplication of these useless results has the potential to explode across multiple downstream nodes once it has been forwarded. The expected behavior is the following:
    1. Any operations that have resulted in a no-op would be discarded. Exceptions would potentially be made for non-materialized KTables. Some might have concerns about a KTable constructed through aggregations. In this case, if a no-op result is produced, how do we determine which timestamp to use? In this case, we would need some configuration i.e. timestamp.aggregation.selection.policy  which will determine if the minimum or maximum timestamp of the aggregation is selected (this configuration is still up for debate).
    2. Any subsequent results which yield a no-op will not be forwarded. That also means that timestamps will not be updated for the corresponding keys. This applies for all non-aggregation operations.
  2. About configurations. As noted, we intend that Kafka Streams be as simple as possible when it comes to configurations. In that case, this KIP proposes that instead, emit-on-change becomes the new behavior of Kafka Streams. Since emit-on-change is more correct, (no-ops shouldn't really be sent in the first place), there is a strong case for no additional config that allows the user to "opt-out" of this change (and remain with emit-on-update).

Above are the main behavior changes that we should expect with this KIP. To reiterate, the primary benefit of this KIP is a performance enhancement. In some topologies, records being sent could have a large proportion of no-ops which leads to increased traffic while at the same time, the extra records sent are of no value. This KIP intends to eliminate this flaw of Kafka Streams.

Implementation [DISCARDED]

...

If the user has logic that involves actively filtering out no-op operations, then they will be able to remove this code in a future release once this KIP has been integrated into Kafka Streams. For now, this KIP will aim to add a minimum number of configurations, and avoid adding APIs. Therefore, no major accomodations has to be made to change your current method calls. . If the user has any logic regarding timestamps, there might be a chance that their program will be affected. This is specific however to the individual user, and behavior will have to be changed accordingly.