Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Stream aggregation (both windowed or not) will not be effected. As emphasized in the behavior changes, the current arrangement means that any operations which will advance stream time will still be emitted (as we recall earlier, operations are dropped only if the timestamp has not changed.)
  2. Windowed operations are a primary user of stream time, so they have the largest potential to be impacted in someway by this KIP. For background, In order to advance stream time far enough to close a window or push it out of retention, the new records must have timestamps that are in later windows, which means that they are updates to new keys, which means they would not be suppressed as idempotent. In retrospect, we can conclude that at worst the maximum amount of stream time which can be delayed by dropped updates would be one window. Interestingly enough, this isn't too bad, since it is not in the contract to emit results at the earliest time possible (just some time after the window closes).
  3. The last one is Suppression. One of the primary usages of suppression is to limit the rate of record emission, and at the same time, there should be a consistent rate of records being forwarded by the suppression operator. If we drop idempotent updates, it is possible that the buffer would grind to a halt since stream time is no longer advanced, but this isn't as big an issue as it seems. If this is the case, the user can simply remove the suppression operator from the topology (and allow the dropping of idempotent updates to fulfill the same role as the suppression buffer).

Future Work

Most would note that a pitfall with this KIP would be that we don't drop idempotent updates for stateless operations where the old result is not immediately available. As we have iterated, procuring/storing the old result in a stateless operation is not viable. In the KIP's current state, the only way to drop idempotent updates from stateless operations will be to materialize it. That of course would also have its own costs. One possible solution is instead to store the SHA256 hash of the serialized byte array instead of the whole result for stateless operations.

In this case, the amount of extra memory used would be minimized, and we would only need to compare the hash codes for equality to see if two byte arrays are equal. The main problem with this approach is that there will still be some extra I/O from inserting and reading elements of a store. For now, we have decided not to implement this feature as part of the proposal mainly because of this reason. If users report that they materialize a large number of stateless operations for the sole purpose of dropping updates, then this feature will be considered as a reasonable alternative. 

Compatibility, Deprecation, and Migration Plan

...