Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state"Under Discussion"

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, Kafka Streams does not handle invalid (i.e., negative) timestamps returned from the TimestampExtractor gracefully, but fails with an exception, because negative timestamps cannot get handled in a meaningful way for any time based operators like window aggregates or joins.

...

  • FailOnInvalidTimestamp (new default extractor, replacing ConsumerRecordTimestampExtractor; behavior stays the same)
  • LogAndSkipOnInvalidTimestamp
  • UsePartitionTimeOnInvalidTimestamp

Proposed Changes

We want to change Streams to an auto-drop behavior for records with negative timestamps (without any further user notification about any dropped records) to enable users to "step over" those records and keep the app running (instead of running into a runtime exception, which would typically bring down the whole application instance). To guard the user from silently dropping messages by default (and to keep the current fail-fast behavior), we change the default extractor ConsumerRecordTimestampExtractor to raise an exception if the embedded 0.10 message timestamp is negative, which includes the case where there is no 0.10 timestamp embedded in the message.

...

  • This is a breaking, incompatible change because (1) TimestampExtractor interface is changed and (2) we remove an exception.
  • By default, the overall behavior is the same as before this KIP (using default timestamp extractor use gets an exception in case of a negative timestamp).
  • Custom timestamp extractors must be updated by the user to adopt the new interface.

Test Plan

The feature can be tested via unit tests.

Rejected Alternatives

  • add a new error handler that is called if a negative timestamp is detected
    • this error handler raises an exception by default
    • user can provide custom error handle via StreamsConfig
    • rejected because it separates the root-case of negative timestamps from reacting/fixing those
      • for custom timestamp extractors, user can check for negative timestamps in the first place (directly after extracting it and before returning it to Streams) and react accordingly
      • thus, error handle is too clumsy to use and too hard to reason about from a user perspective

...