Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Authors: Mital Awachat Christo Lolov 

Status

Current state: "Under Discussion"

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-151951 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Our motivation is to provide an upgrade path for customers with Kafka versions < 2.8.0 wanting to enable Tiered Storage in 3.6.

...

To achieve this, we propose changing Kafka to retroactively create producer snapshot files on upload whenever a segment is due to be archived and lacks one.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

No public interfaces will be changed as part of this KIP.

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Background of ProducerIds

The ProducerStateManager is a logical component of Kafka which keeps a map from producer identifiers to the last record written by that producer. There is a separate ProducerStateManager for each partition. Every time a segment is rolled this map is written to a producer snapshot file (since 2.8.0). The state is also written to a snapshot file on shutdown. These snapshot files are used to recover the broker faster on startup. Much like the index and timeindex files the state stored in the producer snapshots can be recreated from the segments. Unlike the index and timeindex files, however, they require the whole log to be replayed from the last available snapshot or if none are available from the beginning of the partition.

If a producer identifier is not present in the ProducerStateManager when it tries to write records it will receive an UNKNOWN_PRODUCER_ID error. The producer will register a new identifier with the broker and from then on it will write records as normal. Idempotency is ensured for a producer which keeps the same producer identifier, however it is not ensured if the producer changes their identifier.

Proposed solution

There is a configuration called producer.id.expiration.ms whose default value is 1 day (86400000 ms). In theory, if we have an infinite log and we have lost all producer snapshots we should start snapshot recovery from segments no older than 1 day. All producer identifiers from earlier segments would have expired and a producer trying to write records with such an identifier would be asked to request a new one.

As such, the ideal solution when trying to upload a segment to Tiered Storage and not encountering a snapshot would be recreate it by replaying segments no older than the value of producer.id.expiration.ms.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? N/A
  • If we are changing behavior how will we phase out the older behavior? N/A
  • If we need special migration tools, describe them here. N/A
  • When will we remove the existing behavior? N/A

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Unit and integration tests.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

...