Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Voting Adopted Accepted but temporarily reverted (see KAFKA-12508)

Discussion thread: Voting Thread 

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8770
 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  1. Emit on change: The objective of this KIP is to implement this reporting strategy. The primary benefit of such an emission model is that it will drop idempotent updates, so it will be superior to emit on update in that traffic can be significantly reduced in high idempotent update traffic situations.
  2. Emit on update / non-empty content: It has already been implemented by Kafka Streams. Basically, as long as the content or processed result is not empty, we will return the processed result. That could however lead to high idempotent update traffic, which is the primary motivation for adding emit on change.
  3. Emit on window close: This has been implemented some time ago with the suppress operator which allows users to combine results in a window, and then we emit only the final result of the active window. 
  4. Periodic Emission: This emission model is unique in that it is solely dependent on time to emit a given result. Essentially, every time a fixed time interval has passed, then a result will be emitted. 

...

Note: an "update" is a concept that only applies to Table operations, so the concept of an "idempotent update" also only applies to Table operations.
See https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#streams_concepts_ktable for more information.

Given that definition, we propose for Streams to drop idempotent updates in any situation where it's possible and convenient to do so. For example, any time we already have both the prior and new results serialized, we may compare them, and drop the update if it is idempotent.

Note that under this proposal, we can implement idempotence checking in the following situations:
1. Any aggregation (for example, KGroupedStream, KGroupedTable, TimeWindowedKStream, and SessionWindowedKStream operations)

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-12546

2. Any Materialized source KTable operation
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-10248

3. Repartition operations, when we need to send both prior and new results
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-12547

Metrics

The following metric will be added to record the number of idempotent updates dropped:dropped

idempotent-idempotentupdate-updatesskip : (Level 3 - Per Processor Node) DEBUG (rate | total)  

Description: This metric will record the number of idempotent updates (no-ops) that have been dropped. This will be only attached to processor nodes where their operations already provides the old result. In other words, it will include repartition operations and any aggregations.

Note:

  • The rate option measures the average amount of idempotent records dropped in a one second interval.
  • The total option will just give a raw count of the number of records dropped. we have decided that we will forward aggregation results if and

Design Reasoning

With the current default model of emission, we are forwarding the processed results downstream regardless if  it has changed or not. After some discussion, there are a couple of points that I would like to emphasize:

  1. There has been some thoughts as to only coupling emit on change with the suppress operator. However, that approach has been abandoned in favor of a more extensive change which would impact almost all KTable operations supported by Kafka Streams. Our justification is that idempotent update changes should be suppressed even before they are forwarded downstream, as the duplication of these useless results has the potential to explode across multiple downstream nodes once it has been forwarded. The expected behavior is the following:
    1. Any operations that have resulted in a idempotent update would be discarded. Exceptions would be made for non-materialized KTables for performance reasons. Some might have concerns about a KTable constructed through aggregations. In this case, if a idempotent update result is produced, how do we determine which timestamp to use? To keep things simple, we have decided that we will forward drop aggregation results if and only and only if the timestamp and the value had not changed.
  2. About configurations. As noted, we intend that Kafka Streams be as simple as possible when it comes to configurations. In that case, this KIP proposes that instead, emit-on-change becomes the new behavior of Kafka Streams. Since emit-on-change is more efficient (idempotent updates shouldn't really be sent in the first place), there is a strong case for no additional config that allows the user to "opt-out" of this change (and remain with emit-on-update).

...

  1. Stream aggregation (both windowed or not) will not be effected. As emphasized in the behavior changes, the current arrangement means that any operations which will advance stream time will still be emitted (as we recall earlier, operations are dropped only if the timestamp has not changed.)
  2. Windowed operations are a primary user of stream time, so they have the largest potential to be impacted in someway by this KIP. For background, In order to advance stream time far enough to close a window or push it out of retention, the new records must have timestamps that are in later windows, which means that they are updates to new keys, which means they would not be suppressed as idempotent. In retrospect, we can conclude that at worst the maximum amount of stream time which can be delayed by dropped updates would be one window. Interestingly enough, this isn't too bad, since it is not in the contract to emit results at the earliest time possible (just some time after the window closes).
  3. The last one is Suppression. One of the primary usages of suppression is to limit the rate of record emission, and at the same time, there should be a consistent rate of records being forwarded by the suppression operator. If we drop idempotent updates, it is possible that the buffer would grind to a halt since stream time is no longer advanced, but this isn't as big an issue as it seems. If this is the case, the user can simply remove the suppression operator from the topology (and allow the dropping of idempotent updates to fulfill the same role as the suppression buffer).

Future Work

Most would note that a pitfall with this KIP would be that we don't drop idempotent updates for stateless operations where the old result is not immediately available. As we have iterated, procuring/storing the old result in a stateless operation is not viable. In the KIP's current state, the only way to drop idempotent updates from stateless operations will be to materialize it. That of course would also have its own costs. One possible solution is instead to store the SHA256 hash of the serialized byte array instead of the whole result for stateless operations.

In this case, the amount of extra memory used would be minimized, and we would only need to compare the hash codes for equality to see if two byte arrays are equal. The main problem with this approach is that there will still be some extra I/O from inserting and reading elements of a store. For now, we have decided not to implement this feature as part of the proposal mainly because of this reason. If users report that they materialize a large number of stateless operations for the sole purpose of dropping updates, then this feature will be considered as a reasonable alternative. 

Compatibility, Deprecation, and Migration Plan

...