Authors: Mital Awachat Christo Lolov
Status
Current state: "Under Discussion"
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here [Change the link from KAFKA-1 to your own ticket]
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Our motivation is to provide a smooth upgrade experience for customers with Kafka versions < 2.8.0 to 3.6 and enable Tiered Storage.
KIP-405: Kafka Tiered Storage provides infinite storage to Kafka. To achieve this Kafka uploads segments, indices, leader epochs and producer snapshots to remote storage. Producer snapshots were aligned to segments as part of addressing
in Apache Kafka 2.8.0. This means that if a customer upgrades from a Kafka version < 2.8.0 they will have to wait for retention to clean up segments without an associated producer snapshot as detailed in KIP-405: Kafka Tiered Storage#Upgrade.
However, in our experience, customers of Kafka expect to be able to immediately enable tiering on a topic once their cluster upgrade is complete. Once they do this, however, they start seeing NPEs and no data is uploaded to Tiered Storage (https://github.com/apache/kafka/blob/trunk/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/LogSegmentData.java#L61).
To achieve this, we propose changing Kafka to create an empty producer snapshot file and upload it whenever a segment is due to be archived and lacks one.
Public Interfaces
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
No public interfaces will be changed as part of this KIP.
Proposed Changes
Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
Background of ProducerIds
The ProducerStateManager is a logical component of Kafka which keeps a map from producer identifiers to the last offset written by that producer. There is a separate ProducerStateManager for each partition. Every time a segment is rolled this map is written to a producer snapshot file (.snapshot). The state is also written to a snapshot file on shutdown. These snapshot files are used to recover and bring the broker up to speed faster on startup. Much like the index and timeindex files the state stored in the producer snapshots can be recreated from the segment. Unlike the index and timeindex files, however, they require the whole log to be replayed rather than just the segment which lacks the snapshot.
If a producer identifier is not present in the ProducerStateManager when it tries to write records it will receive an UNKNOWN_PRODUCER_ID error. The producer will request and register a new producer identifier with the broker and from then on it will write records as normal. Idempotency is ensured for a producer which keeps the same producer identifier, however it is not ensured if the producer changes their identifier.
Encountering an empty producer snapshot when recovering the active segment is equivalent to forcing an expiry of all producer identifiers which are present in previous segments, but have not yet produced to the active one.
Proposed changes
We propose to create a segment-aligned empty producer snapshot whenever we detect that the value here https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L581 is null. The rest of Tiered Storage's logic should pick it up for upload from then on. No changes will be required on the read path.
FAQs
- Is adding an empty snapshot file safe? If not, what do we risk?
Encountering an empty snapshot file means that Kafka will treat all idempotent producers who produced prior to the offset of the snapshot file as expired. In the situation where a follower broker has rebuilt the auxiliary state from an empty snapshot, the leader broker has just gone down, and the follower broker will become the leader, then some in-flight producer identifiers will be expired potentially resulting in duplicate records.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users? N/A
- If we are changing behavior how will we phase out the older behavior? N/A
- If we need special migration tools, describe them here. N/A
- When will we remove the existing behavior? N/A
Test Plan
Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?
Unit and integration tests.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.
- Generate a correct producer snapshot prior to upload - we rejected this because we either have to reply the log from the beginning or choose an arbitrary earlier segment as the starting point to start calculating said snapshot from.
- Make Kafka not upload segments to remote storage until it has expired all segments lacking a producer snapshot - we think this is just as unexpected (especially if customers aren't well-aware of what producer snapshots are used for) as what the current approach is.
- Do not allow a topic to have its tiering configuration set to true until all of its segments have an associated producer snapshot - the limitation here is that this condition needs to hold true across all brokers and additional synchronisation will be required.
- Do not archive snapshot files - like indices, snapshot files can be recreated by rereading the log. However, in the case of Tiered Storage we make the assumption that replaying the whole log will be quite costly.
- Create empty snapshot files on read if a snapshot file is not found - this is a close second runner, but we chose to discard it. It would not be immediately obvious whether a snapshot is not presented because of an upgrade of Kafka version, because there is a bug in the plugin implementation or because the remote storage itself has been tampered with - if we explicitly write an empty file we reduce some of this ambiguity.