Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

However, in our experience, customers of Kafka expect to be able to immediately enable tiering on a topic once their cluster upgrade is complete. Once they do this, however, they start seeing NPEs and no data is uploaded to Tiered Storage (https://github.com/apache/kafka/blob/trunk/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/LogSegmentData.java#L61).

...

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Background of ProducerIds

The ProducerStateManager is a logical component of Kafka which keeps a map from producer identifiers to the last offset written by that producer. There is a separate ProducerStateManager for each partition. Every time a segment is rolled this map is written to a producer snapshot file (.snapshot). The state is also written to a snapshot file on shutdown. These snapshot files are used to recover and bring the broker up to speed faster on startup. Much like the index and timeindex files the state stored in the producer snapshots can be recreated from the segment. Unlike the index and timeindex files, however, they require the whole log to be replayed rather than just the segment which lacks the snapshot.

If a producer identifier is not present in the ProducerStateManager when it tries to write records it will receive an UNKNOWN_PRODUCER_ID error. The producer will request and register a new producer identifier with the broker and from then on it will write records as normal. Idempotency is ensured for a producer which keeps the same producer identifier, however it is not ensured if the producer changes their identifier.

Encountering an empty producer snapshot when recovering the active segment is equivalent to forcing an expiry of all producer identifiers which are present in previous segments, but have not yet produced to the active one.

Proposed changes

We propose to create a segment-aligned empty producer snapshot whenever we detect that the value here https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L581 is null. The rest of Tiered Storage's logic should pick it up for upload from then on. No changes will be required on the read path.

FAQs

...

  • Is adding an empty snapshot file safe? If not, what do we risk?

An Encountering an empty snapshot file means that Kafka has no knowledge of will treat all idempotent producers who produced prior to the offset of the snapshot file .

What is Kafka's behaviour when it encounters an empty snapshot on read?

Does Kafka deal with empty snapshots someplace else in the code as well?as expired. In the situation where a follower broker has rebuilt the auxiliary state from an empty snapshot, the leader broker has just gone down, the follower broker will become the leader some in-flight producer identifiers will be expired potentially resulting in duplicate records.

Compatibility, Deprecation, and Migration Plan

...