Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Note: We will now refer to no-ops as idempotent updates in this KIP. No-ops tend to sound generic, so we will use "idempotent updates" as our new terminology.

Motivation

Currently, in Kafka Streams, we support two modes of processing:

...

The first one has been present for the majority of Kafka Stream's inception, when results are forwarded regardless . The second option has been added by implementing the so-called "suppression" operator which collects results throughout the window, and extending into the grace period. However, one mode of processing should've been implemented long ago, and that is emit on change. As mentioned in the JIRA, in many cases, the user would process a prior result to yield a new one, yet, this new result is forwarded to a downstream processor even though the operation performed had been effectively a no-opidempotent update. This is problematic in certain circumstances where there is a high number of such operations, leading to an enormous number of useless results being forwarded down the topology.

...

  1. Emit on change: The objective of this KIP is to implement this reporting strategy. The primary benefit of such an emission model is that it will drop no-opsidempotent updates, so it will be superior to emit on update in that traffic can be significantly reduced in high no-op idempotent update traffic situations.
  2. Emit on update / non-empty content: It has already been implemented by Kafka Streams. Basically, as long as the content or processed result is not empty, we will return the processed result. That could however lead to high no-op idempotent update traffic, which is the primary motivation for adding emit on change.
  3. Emit on window close: This has been implemented some time ago with the suppress operator which allows users to combine results in a window, and then we emit only the final result of the active window. 
  4. Periodic Emission: This emission model is unique in that it is solely dependent on time to emit a given result. Essentially, every time a fixed time interval has passed, then a result will be emitted. 

...

For behavior changes, the user should expect the following:

  1. No-op idempotent update operations will only be dropped from KTables, not from other classes.
  2. Streams will only drop no-ops idempotent updates for stateful operations. (KTable operations are separated into two distinct class of operations: stateless and stateful)
    1. If the user is very concerned about a particular stateless operation producing a lot of no-opsidempotent updates, than the user can simply materialize the operation, and then Streams will automatically drop all no-opsidempotent updates.
    2. In certain situations where there is a high volume of no-ops idempotent updates throughout the Streams DAG, it will be recommended practice to materialize all operations to reduce traffic overall across the entire network of nodes.
    3. We will check for equality between two processed results by comparing the serialized byte arrays (we will not rely on equals() for comparison, it also has the potential to be unreliable).
  3. No-ops idempotent updates will be dropped for aggregations where neither the value nor the timestamp has been changed.

Above are the main behavior changes that we should expect with this KIP. This KIP is not intended for correcting semantics. To reiterate, the primary benefit of this KIP is a performance enhancement.  In some topologies, records being sent could have a large proportion of no-ops idempotent updates which leads to increased traffic while at the same time, the extra records sent are of no value. This KIP intends to eliminate this flaw of Kafka Streams.

...

It is also expected that we add some metrics which can count how many no-ops idempotent updates has been dropped. For that matter, even if we don't drop no-opsidempotent updates, we should at the very least record the number of no-ops idempotent updates that has been seen go through a particular processor. There is a strong motivation for doing so. Users sometimes do not know whether or not their Streams application is experiencing a lot of useless traffic. In some certain situations, users are already aware of such a problem, but in many other cases, it is possible that they do not know at all.

Therefore, we should add some metrics which will count the number of no-ops idempotent updates that each node has seen. This will be useful for measuring the true efficiency of their application. (Exact metrics reporting format is yet to be determined.)The exact format is below:

dropped-idempotent-updates : (Level 2 - Per Task) DEBUG (rate | total)  

Description: This metric will record the number of updates that have been dropped since they are essentially re-performing an earlier operation.

Note:

  • The rate option indicates the ratio of records dropped to actual volume of records passing through the task.
  • The total option will just give a raw count of the number of records dropped.

Design Reasoning

With the current default model of emission, we are forwarding the processed results downstream regardless if  it has changed or not. After some discussion, there are a couple of points that I would like to emphasize:

  1. There has been some thoughts as to only coupling emit on change with the suppress operator. However, that approach has been abandoned in favor of a more extensive change which would impact almost all KTable operations supported by Kafka Streams (it might be that we will restrict emit-on-change to materialized KTables). Our justification is that no-op idempotent update changes should be suppressed even before they are forwarded downstream, as the duplication of these useless results has the potential to explode across multiple downstream nodes once it has been forwarded. The expected behavior is the following:
    1. Any operations that have resulted in a no-op idempotent update would be discarded. Exceptions would potentially be made for non-materialized KTables. Some might have concerns about a KTable constructed through aggregations. In this case, if a no-op idempotent update result is produced, how do we determine which timestamp to use? In this case, we would need some configuration i.e. timestamp.aggregation.selection.policy  which will determine if the minimum or maximum timestamp of the aggregation is selected (this configuration is still up for debate).
    2. Any subsequent results which yield a no-op idempotent update will not be forwarded. That also means that timestamps will not be updated for the corresponding keys. This applies for all non-aggregation operations.
  2. About configurations. As noted, we intend that Kafka Streams be as simple as possible when it comes to configurations. In that case, this KIP proposes that instead, emit-on-change becomes the new behavior of Kafka Streams. Since emit-on-change is more correct, (no-ops efficient (idempotent updates shouldn't really be sent in the first place), there is a strong case for no additional config that allows the user to "opt-out" of this change (and remain with emit-on-update).

The other crucial aspect of our proposal is why we restrict ourselves to dropping no-ops idempotent updates for stateful operations (just to clarify on a few points, both the stateless and stateful operators discussed are performing modifications to KTables, not any other data structure). The reasoning is two-fold:

  1. If we are loading the prior result in a stateless operation, there will be a significant performance impact. NOTE: However, this can be subject to change: if we can generate some integer id (such id generation would likely fall on the user for implementation) which can reflect changes in the processed result, then we might be able to extend dropping no-ops idempotent updates to stateless operations as well. This is still an alternative worth considering.
  2. The other reason is that if we are loading a prior result in its entirety for a stateless operation, we are essentially replicating some functions of a stateful operator into a stateless one. After all, a stateless operator was never intended to load a prior result (only a stateful operator should do such a thing). That means there would be some redundancy between stateful and stateless operators. However, this discrepancy (stateless operations don't drop no-ops idempotent updates while stateful operations do) can result in much confusion from user.

Alternative Approaches

There is a possibility where we can support emit-on-change for all operations.

There is more than one way to yield the prior result. After all, we can obtain it from an upstream processor. For most operations, we can forward downstream both the old and new results of the upstream processor. In this case, the same operation will be performed twice. However, each operation can be very expensive. Performing it twice will in other words has the potential to incur horrendous performance hits. It might be that this is not a serious issue, but it is of significant concern. 

The main bottleneck for emit-on-change for stateless operations is really how to load the prior result. That does not necessarily have to be done. Earlier in our discussion, we have talked about using a hash code as a way of uniquely identifying the results, and then comparing those hash codes. But as noted, hash codes can vary across JVMs, and it is not a requirement that the programs on different runs return the same hash code for the exact same object. Instead, we can consider using some method distinct from Object#hashCode(). There is the possibility here that we can add a configuration for allowing emit-on-change for stateless operations. If emit-on-change is enabled, then we can use some method defined by the user i.e. generateUniqueObjectId(V result)  returning a 32-bit or 64-bit integer as an id – this method which will have stricter constraints than a normal hash code function.  These generated ids would be used as is the hash codes described in the Implementation section below. We store these ids instead, and compare these for equality.

This potentially can work, but the user must implement the provided method correctly. This must be stressed in further documentation. 

Implementation [DISCARDED]

...

  1. Let's say we receive record 1 with timestamp 0 which leads to a change in the result processed. 
  2. Afterwards, we receive another record with timestamp 100 which would lead to a no-opidempotent update

It is required that the timestamp be 0. 

...

If the user has logic that involves actively filtering out no-op idempotent update operations, then they will be able to remove this code in a future release once this KIP has been integrated into Kafka Streams. For now, this KIP will aim to add a minimum number of configurations, and avoid adding APIs. If the user has any logic regarding timestamps, there might be a chance that their program will be affected. This is specific however to the individual user, and behavior will have to be changed accordingly.

...

For configurations, we have decided that an opt-out config is not necessary. Emit-on-change as a whole is more correct when intended solely as a performance optimization when it comes to Kafka Streams. The user should have little reason not to want this change, particularly since this KIP should not have a severe impact on performance (it is possible in implementation that we will only check if two values are equal in KTables where prior results has already been loaded). 

...

Also, we have decided that we should not restrict only dropping no-ops idempotent updates to the suppress operator. It is better to allow this behavior across most KTable API, rather than just one. This provides both flexibility for the user as well as more granular control over which no-ops idempotent updates we can drop and not drop

Secondary Approaches

There is a possibility where we can support emit-on-change for all operations.

There is more than one way to yield the prior result. After all, we can obtain it from an upstream processor. For most operations, we can forward downstream both the old and new results of the upstream processor. In this case, the same operation will be performed twice. However, each operation can be very expensive. Performing it twice will in other words has the potential to incur horrendous performance hits. It might be that this is not a serious issue, but it is of significant concern. 

The main bottleneck for emit-on-change for stateless operations is really how to load the prior result. That does not necessarily have to be done. Earlier in our discussion, we have talked about using a hash code as a way of uniquely identifying the results, and then comparing those hash codes. But as noted, hash codes can vary across JVMs, and it is not a requirement that the programs on different runs return the same hash code for the exact same object. Instead, we can consider using some method distinct from Object#hashCode(). There is the possibility here that we can add a configuration for allowing emit-on-change for stateless operations. If emit-on-change is enabled, then we can use some method defined by the user i.e. generateUniqueObjectId(V result)  returning a 32-bit or 64-bit integer as an id – this method which will have stricter constraints than a normal hash code function.  These generated ids would be used as is the hash codes described in the Implementation section below. We store these ids instead, and compare these for equality.

This potentially can work, but the user must implement the provided method correctly. This must be stressed in further documentation