Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • If you are using the default ConsumerRecordTimestampExtractor, (FailOnInvalidTimestamp since v0.10.2) it is most likely that your records do not carry an embedded timestamp (embedded record timestamps got introduced in Kafka’s message format in Kafka 0.10). This might happen, if for example, you consume a topic that is written by old Kafka producer clients (i.e., version 0.9 or earlier) or by third-party producer clients. Another situation where this may happen is after upgrading your Kafka cluster from 0.9 to 0.10, where all the data that was generated with 0.9 does not include the 0.10 message timestamps.
  • If you are using a custom timestamp extractor, make sure that your extractor is properly handling invalid (negative) timestamps, where “properly” depends on the semantics of your application. For example, you can return a default or an estimated timestamp if you cannot extract a valid timestamp (maybe the timestamp field in your data is just missing).
  • As of Kafka 0.10.2, there are two alternative extractors namely LogAndSkipOnInvalidTimestamp and UsePreviousTimeOnInvalidTimestamp that handle invalid record timestamps more gracefully (but with potential data loss or semantical impact).
  • You can also switch to processing-time semantics via WallclockTimestampExtractor; whether such a fallback is an appropriate response to this situation depends on your use case.
How to write a custom TimestampExtractor ?

If you want to provide a custom timestamp extractor, you have to note, that the extractor is applied globally, i.e., to all user and Streams-internal topics. Because, Kafka Streams relies on record metadata timestamps for all its internal topics, you need to write you extractor in a flexible way and return different timestamps for different topics. In detail, you should branch (if-else) within your code and only apply custom logic for records from your input topics you care about (you can check the topic via ConsumerRecord#topic()). For all other topic (inclusive all Streams internal topics) you should return the record's metadata timestamp you can access via ConsumerRecord#timestamp().

How to scale a Streams app, i.e., increase number of input partitions?

...