Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Proposed Behavior Changes

For behavior changes, the user should expect the following:

  1. idempotent update operations will only be dropped from KTables, not from other classes.
  2. Streams will only drop idempotent updates for stateful operations. (KTable operations are separated into two distinct class of operations: stateless and stateful)
    1. If the user is very concerned about a particular stateless operation producing a lot of idempotent updates, than the user can simply materialize the operation, and then Streams will automatically drop all idempotent updates.
    2. In certain situations where there is a high volume of idempotent updates throughout the Streams DAG, it will be recommended practice to materialize all operations to reduce traffic overall across the entire network of nodes.
    3. We will check for equality between two processed results by comparing the serialized byte arrays (we will not rely on equals() for comparison, it also has the potential to be unreliable).
  3. idempotent updates will be dropped for aggregations where neither the value nor the timestamp has been changed.

Above are the main behavior changes that we should expect with this KIP. This KIP is not intended for correcting semantics. To reiterate, the primary benefit of this KIP is a performance enhancement.  In some topologies, records being sent could have a large proportion of idempotent updates which leads to increased traffic while at the same time, the extra records sent are of no value. This KIP intends to eliminate this flaw of Kafka Streams.

Metrics

It is also expected that we add some metrics which can count how many idempotent updates has been dropped. For that matter, even if we don't drop idempotent updates, we should at the very least record the number of idempotent updates that has been seen go through a particular processor. There is a strong motivation for doing so. Users sometimes do not know whether or not their Streams application is experiencing a lot of useless traffic. In some certain situations, users are already aware of such a problem, but in many other cases, it is possible that they do not know at all.


Definition: "idempotent update" is one in which the new result and prior
result,  when serialized, are identical byte arrays.

Note: an "update" is a concept that only applies to Table operations, so
the concept of an "idempotent update" also only applies to Table operations.
See https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#streams_concepts_ktable
for more information.

Given that definition, we propose for Streams to drop idempotent updates
in any situation where it's possible and convenient to do so. For example,
any time we already have both the prior and new results serialized, we
may compare them, and drop the update if it is idempotent.

Note that under this proposal, we can implement idempotence checking
in the following situations:
1. Any aggregation (for example, KGroupedStream, KGroupedTable,
     TimeWindowedKStream, and SessionWindowedKStream operations)
2. Any Materialized KTable operation
3. Repartition operations, when we need to send both prior and new results

Metrics

The following metric will be added to record the number of idempotent updates droppedTherefore, we should add some metrics which will count the number of idempotent updates that each node has seen. This will be useful for measuring the true efficiency of their application. The exact format is below:

dropped-idempotent-updates : (Level 2 1 - Per TaskProcessor Node) DEBUG (rate | total)  

Description: This metric will record the number of idempotent updates (no-ops) that have been dropped.

...

  1. There has been some thoughts as to only coupling emit on change with the suppress operator. However, that approach has been abandoned in favor of a more extensive change which would impact almost all KTable operations supported by Kafka Streams (it might be that we will restrict emit-on-change to materialized KTables). Our justification is that idempotent update changes should be suppressed even before they are forwarded downstream, as the duplication of these useless results has the potential to explode across multiple downstream nodes once it has been forwarded. The expected behavior is the following:
    1. Any operations that have resulted in a idempotent update would be discarded. Exceptions would potentially be made for non-materialized KTables for performance reasons. Some might have concerns about a KTable constructed through aggregations. In this case, if a idempotent update result is produced, how do we determine which timestamp to use? In this case, we would need some configuration i.e. timestamp.aggregation.selection.policy  which will determine if the minimum or maximum timestamp of the aggregation is selected (this configuration is still up for debate).Any subsequent results which yield a idempotent update will not be forwarded. That also means that timestamps will not be updated for the corresponding keys. This applies for all non-aggregation operations.To keep things simple, we have decided that we will forward aggregation results if and only if the timestamp and the value had not changed.
  2. About configurations. As noted, we intend that Kafka Streams be as simple as possible when it comes to configurations. In that case, this KIP proposes that instead, emit-on-change becomes the new behavior of Kafka Streams. Since emit-on-change is more efficient (idempotent updates shouldn't really be sent in the first place), there is a strong case for no additional config that allows the user to "opt-out" of this change (and remain with emit-on-update).

The other crucial aspect of our proposal is why we restrict ourselves to dropping idempotent updates for stateful operations (just to clarify on a few points, both the stateless and stateful operators discussed are performing modifications to KTables, not any other data structure). The reasoning is two-fold:

  1. If we are loading the prior result in a stateless operation, there will be a significant performance impact. NOTE: However, this can be subject to change: if we can generate some integer id (such id generation would likely fall on the user for implementation) which can reflect changes in the processed result, then we might be able to extend dropping idempotent updates to stateless operations as well. This is still an alternative worth considering.
  2. The other reason is that if we are loading a prior result in its entirety for a stateless operation, we are essentially replicating some functions of a stateful operator into a stateless one. After all, a stateless operator was never intended to load a prior result (only a stateful operator should do such a thing). That means there would be some redundancy between stateful and stateless operators. However, this discrepancy (stateless operations don't drop idempotent updates while stateful operations do) can result in much confusion from user.

Implementation [DISCARDED]

NOTE: THIS SECTION HAS BEEN DISCARDED AS A PORTION OF THE KIP. IT HAS BEEN LEFT ONLY IN THE KIP FOR RECORD-KEEPING PURPOSES. FOR THOSE WHO ARE INTERESTED, THE CORE APPROACH (STORING HASH CODES INSTEAD OF ACTUAL VALUES) HAS BEEN REJECTED. THIS IS SINCE THE BEHAVIOR OF HASH CODES IS SUBJECT TO WIDE VARIATION ACROSS MULTIPLE SYSTEMS, AND COULD NOT BE USED RELIABLY TO COMPARE THE ACTUAL VALUES OF PROCESSED RESULTS.

This section is to discuss some points on how we should approach the implementation of this KIP. Some extra configurations will probably result from the implementation, but that is still debatable. See below for possible additions.   

Details on Core Improvement

Emit on change, in many aspects, is basically an improved version of emit on update, except the crucial improvement is that we check if the new result had been changed by a given operation. And for doing such a check, if we want to be thorough, load the previous result and compare it with the most recent one. However, this would not be doable in some cases without incurring extra performance hits. Therefore, rather than load the entire result, we should compare the previous result's hash code with the current result's hash code. Admittedly, there might be a small chance of collisions where the new result with changed values would still yield the same hash code. Yet, it could be safe to say that a good hash function would never let such a thing happen.

However, the main constraint for this approach, if we were to follow through, is that the hash code must reflect any changes in the individual component fields of the result. The key building block for this entire optimization is that the user provides a good hash code override of the given value type. Of course, this isn't always the case.  After all, it isn't safe to assume that the user has provided a safe hash code function for us to take advantage of. In conclusion, we can probably come up with the following choices for the user:

  1. Completely disregard the hash code and load the prior result and compare it to the new one.
  2. Compare only the hash codes of the given objects instead.

The second option quite obviously is the recommended one. While it is uncertain if we will need to give the user the ability to choose between these two options, it would probably be necessary. In the latter option, it should be emphasized that the hash function must be chosen carefully. In many cases, Kafka already loads the prior result anyway. So for instances where we already load the previous result, we will just use that instead of the hash code (if we are to implement option 2). 

Discourse on other areas

The timestamps of the results processed would also present a serious design challenge for this KIP. The requirements of emit on change would entail the following behavior:

  1. Let's say we receive record 1 with timestamp 0 which leads to a change in the result processed. 
  2. Afterwards, we receive another record with timestamp 100 which would lead to a idempotent update. 

It is required that the timestamp be 0. 

To resolve this situation, the current best bet is probably to load the timestamp along with the hash code / full prior resultoperations that does not have the old result already loaded. To check for idempotence, both the old and new result has to be available. However, when the old result is unavailable, then we will have to procure that result again somehow. Unfortunately, that would exact an extra performance cost (which can be quite expensive)

Compatibility, Deprecation, and Migration Plan

...

This potentially can work, but the user must implement the provided method correctly. This must be stressed in further documentation. 

Forwarding the Old Result With the New

There had been some considerations, that for stateless operations, we would forward the old result along with the new one. Then, the stateless operation would be performed twice, once on the old, and once on the new. We compare these two new results to check if they are equal. The drawback with this approach is that stateless operations cannot be assumed to be inexpensive. Performing these operations twice would not work (especially since we are working to optimize Kafka Streams).