You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: DRAFT

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: KAFKA-14991

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In Kafka, the introduction of the timestamp field in the message format through KIP-32 brought along two additional configurations: message.timestamp.type and max.message.time.difference.ms.

By default, the configuration values are message.timestamp.type=CreateTime and max.message.time.difference.ms=9223372036854775807 (java.lang.Long.MAX_VALUE). This allows Producers to send messages with timestamps as far back as the minimum representable timestamp and ahead in the future at a scale of 100s of years. (Side Note: We could potentially change the default value of max.message.time.difference.ms to something more sensible but that is not the motivation for this KIP.)

While there can be valid use cases for messages with past timestamps for the purpose of replaying old messages, messages with future timestamps are inherently inaccurate and can lead to unexpected log rotation behavior. Kafka users have encountered problems due to misconfigured producers, such as using nanoseconds instead of milliseconds for the message timestamp. This anecdotal evidence from a Medium article highlights the challenges associated with this issue.

The motivation behind this proposal is to improve the validation logic for message timestamps by rejecting messages with future timestamps and providing a descriptive exception. This will help improve data integrity and prevent potential pitfalls caused by inaccurate timestamp handling.

Public Interfaces

There will be no change to existing public interfaces. However, when a producer's record is rejected due to this new validation, we will return error code 32 (INVALID_TIMESTAMP) with the error message "Timestamp of the message with offset [record offset] is ahead of the broker's current time."

Proposed Changes

In the org.apache.kafka.storage.internals.log.LogValidator, we will introduce a new validation that compares the message timestamp to the broker's timestamp and rejects the message if it is ahead of the broker's timestamp. This validation will be applicable when message.timestamp.type is set to CreateTime. There will be no other change to the exiting validation logic.

Considering potential clock drift issues that the broker may encounter with the time synchronization service of the underlying operating system, we propose the introduction of a constant, TIME_DRIFT_TOLERANCE, with a value of one hour. Time synchronization services like NTP and PTP are capable of fixing drift in the order of milliseconds. Therefore, assuming a one-hour threshold should be sufficient to accommodate all clock drift cases.

Compatibility, Deprecation, and Migration Plan

There are no changes to public interfaces that will impact clients. However, this change is considered breaking, as messages with future timestamps that were previously accepted by the broker will now be rejected.

Test Plan

  • This change will be tested via unit test.
  • Additional verification will be done by recreating the issue manually and verifying that the correct error message is returned.


Rejected Alternatives

Add a broker-level configuration for TIME_DRIFT_TOLERANCE instead of a hardcoded constant.

Pros:

  • Configuration Flexibility: Having the time drift tolerance as a configuration would allow Kafka users to adjust this value according to their specific requirements.

Cons:

  • Unusable configuration: Users typically resolve clock drift issues by fixing the clock drift on the broker or producer side, rather than adjusting the time drift tolerance configuration. Therefore, introducing a new configuration specifically for this purpose would likely result in an unused or disregarded configuration.
  • Increased complexity: Kafka already has a significant number of configurations and adjustable parameters. Adding a new configuration would contribute to the overall complexity of the system, requiring users to understand, manage, and adjust yet another parameter.
  • One-way door decision: Introducing this configuration would be an irreversible decision, and it would need to be supported in all future Kafka versions. However, if the decision is made to add the configuration in the future, instead of hardcoding the value, it could be implemented without causing disruptions or breaking existing functionalities.


  • No labels