Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The first one has been present for the majority of Kafka Stream's inception, when results are forwarded regardless . The second option has been added by implementing the so-called "suppression" operator which collects results throughout the window, and extending into the grace period. However, one mode of processing should've been implemented long ago, and that is emit on change. As mentioned in the JIRA, in many cases, the user would process a prior result to yield a new one, yet, this new result is forwarded to a downstream processor even though the operation performed had been effectively a no-op. This is problematic in certain circumstances where there is a high number of such operations, leading to an enormous number of useless results being forwarded down the topology.

This KIP will also discuss other possible reporting strategies for Kafka Streams and weigh its pros and cons.

Reporting Strategies

Reporting strategies are basically just another word for models of emission. When we emit results, and how we determine if we do it is discussed here.

  1. Emit on change: The objective of this KIP is to implement this reporting strategy. The primary benefit of such an emission model is that it will drop no-ops, so it will be superior to emit on update in that traffic can be significantly reduced in high no-op traffic situations.
  2. Emit on update / non-empty content: It has already been implemented by Kafka Streams. Basically, as long as the content or processed result is not empty, we will return the processed result. That could however lead to high no-op traffic, which is the primary motivation for adding emit on change.
  3. Emit on window close: This has been implemented some time ago with the suppress operator which allows users to combine results in a window, and then we emit only the final result of the active window. 
  4. Periodic Emission: This emission model is unique in that it is solely dependent on time to emit a given result. Essentially, every time a fixed time interval has passed, then a result will be emitted. 

For the above emission models, two have already been implemented, and we are aiming to implement another in this KIP. Periodic emission will likely not be added for now, as there has been little user demand for such a feature.

Proposed Behavior Changes

...

  1. No-op operations will only be dropped from KTables, not from other classes.
  2. Streams will only drop no-ops for stateful operations.  (KTable operations are separated into two distinct class of operations: stateless and stateful)
    1. If the user is very concerned about a particular stateless operation producing a lot of no-ops, than the user can simply materialize the operation, and then Streams will automatically drop all no-ops.
    2. In certain situations where there is a high volume of no-ops throughout the Streams DAG, it will be recommended practice to materialize all operations to reduce traffic overall across the entire network of nodes.
  3. No-ops will be dropped for aggregations where the value nor the timestamp has been changed.

Above are the main behavior changes that we should expect with this KIP. To reiterate, the primary benefit of this KIP is a performance enhancement. In some topologies, records being sent could have a large proportion of no-ops which leads to increased traffic while at the same time, the extra records sent are of no value. This KIP intends to eliminate this flaw of Kafka Streams.

...

The other crucial aspect of our proposal is why we restrict ourselves to dropping no-ops for stateful operations (just to clarify on a few points, both the stateless and stateful operators discussed are performing modifications to KTables, not any other data structure). The reasoning is two-fold:

...

If the user has logic that involves actively filtering out no-op operations, then they will be able to remove this code in a future release once this KIP has been integrated into Kafka Streams. For now, this KIP will aim to add a minimum number of configurations, and avoid adding APIs. If the user has any logic regarding timestamps, there might be a chance that their program will be affected. This is specific however to the individual user, and behavior will have to be changed accordingly.

Rejected Alternatives

Configuration for Opting Out

For configurations, we have decided that an opt-out config is not necessary. Emit-on-change as a whole is more correct when it comes to Kafka Streams. The user should have little reason not to want this change, particularly since this KIP should not have a severe impact on performance (it is possible in implementation that we will only check if two values are equal in KTables where prior results has already been loaded)

Emit-on-change only for Suppress Operator

Also, we have decided that we should not restrict only dropping no-ops to the suppress operator. It is better to allow this behavior across most KTable API, rather than just one. This provides both flexibility for the user as well as more granular control over which no-ops we can drop and not drop