Current state: Under Discussion
Discussion thread: TBD
JIRA: KAFKA-8770
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Currently, in Kafka Streams, we support two modes of processing:
The first one has been present for the majority of Kafka Stream's inception, when results are forwarded regardless . The second option has been added by implementing the so-called "suppression" operator which collects results throughout the window, and extending into the grace period. However, one mode of processing should've been implemented long ago, and that is emit on change. As mentioned in the JIRA, in many cases, the user would process a prior result to yield a new one, yet, this new result is forwarded to a downstream processor even though the operation performed had been effectively a no-op. This is problematic in certain circumstances where there is a high number of such operations, leading to an enormous number of useless results being forwarded down the topology.
For behavior changes, the user should expect the following:
Above are the main behavior changes that we should expect with this KIP. To reiterate, the primary benefit of this KIP is a performance enhancement. In some topologies, records being sent could have a large proportion of no-ops which leads to increased traffic while at the same time, the extra records sent are of no value. This KIP intends to eliminate this flaw of Kafka Streams.
With the current default model of emission, we are forwarding the processed results downstream regardless if it has changed or not. After some discussion, there are a couple of points that I would like to emphasize:
timestamp.aggregation.selection.policy
which will determine if the minimum or maximum timestamp of the aggregation is selected (this configuration is still up for debate).emit-on-change
is more correct, (no-ops shouldn't really be sent in the first place), there is a strong case for no additional config that allows the user to "opt-out" of this change (and remain with emit-on-update).The other crucial aspect of our proposal is why we restrict ourselves to dropping no-ops for stateful operations. The reasoning is two-fold:
There is a possibility where we can support emit-on-change for all operations.
There is more than one way to yield the prior result. After all, we can obtain it from an upstream processor. For most operations, we can forward downstream both the old and new results of the upstream processor. In this case, the same operation will be performed twice. However, each operation can be very expensive. Performing it twice will in other words has the potential to incur horrendous performance hits. It might be that this is not a serious issue, but it is of significant concern.
The main bottleneck for emit-on-change for stateless operations is really how to load the prior result. That does not necessarily have to be done. Earlier in our discussion, we have talked about using a hash code as a way of uniquely identifying the results, and then comparing those hash codes. But as noted, hash codes can vary across JVMs, and it is not a requirement that the programs on different runs return the same hash code for the exact same object. Instead, we can consider using some method distinct from Object#hashCode()
. There is the possibility here that we can add a configuration for allowing emit-on-change for stateless operations. If emit-on-change is enabled, then we can use some method defined by the user i.e. generateUniqueObjectId(V result)
returning a 32-bit or 64-bit integer as an id – this method which will have stricter constraints than a normal hash code function. This method would be used as is the hash codes described in the Implementation section below. We store these ids instead, and compare these for equality.
This potentially can work, but the user must implement the provided method correctly. This must be stressed in further documentation.
NOTE: THIS SECTION HAS BEEN DISCARDED AS A PORTION OF THE KIP. IT HAS BEEN LEFT ONLY IN THE KIP FOR RECORD-KEEPING PURPOSES. FOR THOSE WHO ARE INTERESTED, THE CORE APPROACH (STORING HASH CODES INSTEAD OF ACTUAL VALUES) HAS BEEN REJECTED. THIS IS SINCE THE BEHAVIOR OF HASH CODES IS SUBJECT TO WIDE VARIATION ACROSS MULTIPLE SYSTEMS, AND COULD NOT BE USED RELIABLY TO COMPARE THE ACTUAL VALUES OF PROCESSED RESULTS.
This section is to discuss some points on how we should approach the implementation of this KIP. Some extra configurations will probably result from the implementation, but that is still debatable. See below for possible additions.
Emit on change, in many aspects, is basically an improved version of emit on update, except the crucial improvement is that we check if the new result had been changed by a given operation. And for doing such a check, if we want to be thorough, load the previous result and compare it with the most recent one. However, this would not be doable in some cases without incurring extra performance hits. Therefore, rather than load the entire result, we should compare the previous result's hash code with the current result's hash code. Admittedly, there might be a small chance of collisions where the new result with changed values would still yield the same hash code. Yet, it could be safe to say that a good hash function would never let such a thing happen.
However, the main constraint for this approach, if we were to follow through, is that the hash code must reflect any changes in the individual component fields of the result. The key building block for this entire optimization is that the user provides a good hash code override of the given value type. Of course, this isn't always the case. After all, it isn't safe to assume that the user has provided a safe hash code function for us to take advantage of. In conclusion, we can probably come up with the following choices for the user:
The second option quite obviously is the recommended one. While it is uncertain if we will need to give the user the ability to choose between these two options, it would probably be necessary. In the latter option, it should be emphasized that the hash function must be chosen carefully. In many cases, Kafka already loads the prior result anyway. So for instances where we already load the previous result, we will just use that instead of the hash code (if we are to implement option 2).
The timestamps of the results processed would also present a serious design challenge for this KIP. The requirements of emit on change would entail the following behavior:
It is required that the timestamp be 0.
To resolve this situation, the current best bet is probably to load the timestamp along with the hash code / full prior result.
If the user has logic that involves actively filtering out no-op operations, then they will be able to remove this code in a future release once this KIP has been integrated into Kafka Streams. For now, this KIP will aim to add a minimum number of configurations, and avoid adding APIs. If the user has any logic regarding timestamps, there might be a chance that their program will be affected. This is specific however to the individual user, and behavior will have to be changed accordingly.
For configurations, we have decided that an opt-out config is not necessary. Emit-on-change as a whole is more correct when it comes to Kafka Streams. The user should have little reason not to want this change, particularly since this KIP should not have a severe impact on performance (it is possible in implementation that we will only check if two values are equal in KTables where prior results has already been loaded).