Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • If you are using the default ConsumerRecordTimestampExtractor, (FailOnInvalidTimestamp since v0.10.2) it is most likely that your records do not carry an embedded timestamp (embedded record timestamps got introduced in Kafka’s message format in Kafka 0.10). This might happen, if for example, you consume a topic that is written by old Kafka producer clients (i.e., version 0.9 or earlier) or by third-party producer clients. Another situation where this may happen is after upgrading your Kafka cluster from 0.9 to 0.10, where all the data that was generated with 0.9 does not include the 0.10 message timestamps.
  • If you are using a custom timestamp extractor, make sure that your extractor is properly handling invalid (negative) timestamps, where “properly” depends on the semantics of your application. For example, you can return a default or an estimated timestamp if you cannot extract a valid timestamp (maybe the timestamp field in your data is just missing).
  • As of Kafka 0.10.2, there are two alternative extractors namely LogAndSkipOnInvalidTimestamp and UsePreviousTimeOnInvalidTimestamp that handle invalid record timestamps more gracefully (but with potential data loss or semantical impact).
  • You can also switch to processing-time semantics via WallclockTimestampExtractor; whether such a fallback is an appropriate response to this situation depends on your use case.
How to scale a Streams app, i.e., increase number of input partitions?

Basically, Kafka Streams does not allow to change the number of input topic partitions during its "life time". If you stop a running Kafka Streams application, change the number of input topic partitions, and restart your app it will most likely break with an exception as described in FAQ "What does exception "Store <someStoreName>'s change log (<someStoreName>-changelog) does not contain partition <someNumber>" mean?" It is tricky to fix this for production use cases and it is highly recommended to not change the number of input topic partitions (cf. comment below). For POC/demos it's not difficult to fix though.

In order to fix this, you should reset your application using Kafka's application reset tool: Kafka Streams Application Reset Tool. Using the application reset tool, has the disadvantage that you wipe out your whole application state. Thus, in order to get your application into the same state as before, you need to reprocess the whole input topic from beginning. This is of course only possible, if all input data is still available and nothing got deleted by brokers that applying topic retention time/size policy. Furthermore you should note, that adding partitions to input topics changes the topic's partitioning schema (be default hash-based partitioning by key). Because Kafka Streams assumes that input topics are correctly partitioned by key, if you use the reset tool and reprocess all data, you might get wrong result as "old" data is partitioned differently than "new" data (ie, data written after adding the new partitions). For production use cases, you would need to read all data from your original topic and write it into a new topic (with increased number of partitions) to get your data partitioned correctly (or course, this step might change the ordering of records with different keys -- what should not be an issue usually -- just wanted to mention it). Afterwards you can use the new topic as input topic for your Streams app. This repartitioning step can be done easily within your Streams application that read and write's back the date without any further processing

In general however, it is recommended to over partition your topics for production use cases, such that you will never need to change the number of partitions later on. The overhead of over partitioning is rather small and saves you a lot of hassle later on. This is a general recommendation if you work with Kafka -- it's not limited to Streams use cases.

One more remark:

Some people might suggest to increase the number of partitions of Kafka Streams internal topics manually. First, this would be a hack and is not recommended for certain reasons.

  1. It might be tricky to figure out what the right number is, as it depends on various factors (as it's a Stream's internal implementation detail).
  2. You also face the problem of breaking the partitioning scheme, as described in the paragraph above. Thus, you application most likely ends up in an inconsistent state.

In order to avoid inconsistent application state, Streams does not delete any internal topics or changes the number of partitions of internal topics automatically, but fails with the error message you reported. This ensure, that the user is aware of all implications by doing the "cleanup" manually.

What does exception "Store <someStoreName>'s change log (<someStoreName>-changelog) does not contain partition <someNumber>" mean?

It means that a Streams internal changelog topic does not have the expected number of topic partitions. This can happen, if you add partitions to an input topic. For more details, see FAQ "How to scale a Streams app, i.e., increase the number of input topic partitions".


Producers

How should I set metadata.broker.list?

...