Authors Mehari Beyene, Divij Vaidya

Status

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: KAFKA-14991

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In Kafka, the introduction of the timestamp field in the message format through KIP-32 brought along two additional configurations: message.timestamp.type and max.message.time.difference.ms.

By default, the configuration values are message.timestamp.type=CreateTime and max.message.time.difference.ms=9223372036854775807 (java.lang.Long.MAX_VALUE). This allows Producers to send messages with timestamps as far back as the minimum representable timestamp and ahead in the future at a scale of 100s of years. (Side Note: We could potentially change the default value of max.message.time.difference.ms to something more sensible but that is not the motivation for this KIP.)

While there can be valid use cases for messages with past timestamps for the purpose of replaying old messages, messages with future timestamps are inherently inaccurate and can lead to unexpected log rotation behavior. Kafka users have encountered problems due to misconfigured producers, such as using nanoseconds instead of milliseconds for the message timestamp. This anecdotal evidence from a Medium article highlights the challenges associated with this issue.

The motivation behind this proposal is to improve the validation logic for message timestamps by rejecting messages with future timestamps and providing a descriptive exception. This will help improve data integrity and prevent potential pitfalls caused by inaccurate timestamp handling.

Public Interfaces

There will be no change to existing public interfaces. However, when a producer's record is rejected due to this new validation, we will return error code 32 (INVALID_TIMESTAMP) with the error message "Timestamp of the message with offset [record offset] is ahead of the broker's current time."

Proposed Changes

In the org.apache.kafka.storage.internals.log.LogValidator, we will introduce a new validation that compares the message timestamp to the broker's timestamp and rejects the message if it is ahead of the broker's timestamp. This validation will be applicable when message.timestamp.type is set to CreateTime. There will be no other change to the exiting validation logic.

Considering potential clock drift issues that the broker may encounter with the time synchronization service of the underlying operating system, we propose the introduction of a constant, TIME_DRIFT_TOLERANCE, with a value of one hour. Time synchronization services like NTP and PTP are capable of fixing drift in the order of milliseconds. Therefore, assuming a one-hour threshold should be sufficient to accommodate all clock drift cases.

The code change that is proposed by this KIP is captured in this Pull Request.

Compatibility, Deprecation, and Migration Plan

There are no changes to public interfaces that will impact clients. However, this change is considered breaking, as messages with future timestamps that were previously accepted by the broker will now be rejected.

Test Plan


Rejected Alternatives

Add a broker-level configuration for TIME_DRIFT_TOLERANCE instead of a hardcoded constant.

Pros:

Cons: