Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state"Under Discussion"Accepted [VOTE] KIP-93: Improve invalid timestamp handling in Kafka Streams

Discussion thread: TODO: [DISCUSS] KIP-93: Improve invalid timestamp handling in Kafka Streams

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4393

...

  • FailOnInvalidTimestamp (new default extractor, replacing ConsumerRecordTimestampExtractor; behavior stays the same)
  • LogAndSkipOnInvalidTimestamp
  • UsePartitionTimeOnInvalidTimestampUsePreviousTimeOnInvalidTimestamp

Proposed Changes

We want to change Streams to an auto-drop behavior for records with negative timestamps (without any further user notification about any dropped records) to enable users to "step over" those records and keep the app running (instead of running into a runtime exception, which would typically bring down the whole application instance). To guard the user from silently dropping messages by default (and to keep the current fail-fast behavior), we change the default extractor ConsumerRecordTimestampExtractor to raise an exception if the embedded 0.10 message timestamp is negative, which includes the case where there is no 0.10 timestamp embedded in the message.

...

Compatibility, Deprecation, and Migration Plan

 This is a breaking, incompatible change because

...

TimestampExtractor interface gets changed. However, it only affect uses that provide a custom timestamp extractor. By default no code change is required and the overall behavior is the same as before this KIP (using default timestamp extractor

...

user gets an exception in case of a negative timestamp).

  • Even if the exception is throw from a different point and the exception message changes (the exception type is the same: StreamsException) the use cannot recover from the exception anyway (i.e., user cannot recover with current behavior and this will not change)

Required code changes:

  • Custom timestamp extractors must be updated by the user to adopt the new interface.:
    • recompile to avoid runtime exception
    • code change to adapt to new interface (to make it compile)

Test Plan

The feature can be tested via unit tests.

...