You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 13 Next »

Authors Mehari Beyene, Divij Vaidya

Status

Current state: Under Discussion

Discussion thread: here

JIRA: KAFKA-14991

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In Kafka, the introduction of the timestamp field in the message format through KIP-32 brought along two additional configurations: log.message.timestamp.type and log.message.timestamp.difference.max.ms.

By default, the configuration values are log.message.timestamp.type=CreateTime and log.message.timestamp.difference.max.ms=9223372036854775807 (java.lang.Long.MAX_VALUE). This allows Producers to send messages with timestamps as far back as the minimum representable timestamp and ahead in the future at a scale of 100s of years. (Side Note: We could potentially change the default value of log.message.timestamp.difference.max.ms to something more sensible but that is not the motivation for this KIP.)

While there can be valid use cases for messages with past timestamps for the purpose of replaying old messages, messages with future timestamps are inherently inaccurate and can lead to unexpected log rotation behavior. Kafka users have encountered problems due to misconfigured producers, such as using nanoseconds instead of milliseconds for the message timestamp. This anecdotal evidence from a Medium article highlights the challenges associated with this issue.

The motivation behind this proposal is to improve the validation logic for message timestamps by rejecting messages with future timestamps and providing a descriptive exception. This will help improve data integrity and prevent potential pitfalls caused by inaccurate timestamp handling.

Public Interfaces

There will be no change to existing public interfaces. However, when a producer's record is rejected due to this new validation, we will return error code 32 (INVALID_TIMESTAMP) with the error message "Timestamp of the message with offset [record offset] is ahead of the broker's current time."

Proposed Changes

In the org.apache.kafka.storage.internals.log.LogValidator, we will introduce a new validation that compares the message timestamp to the broker's timestamp and rejects the message if it is ahead of the broker's timestamp. This validation will be applicable when log.message.timestamp.type is set to CreateTime. There will be no other change to the exiting validation logic.

Considering potential clock drift issues that the broker may encounter with the time synchronization service of the underlying operating system, we propose the introduction of a constant, TIME_DRIFT_TOLERANCE, with a value of one hour. Time synchronization services like NTP and PTP are capable of fixing drift in the order of milliseconds. Therefore, assuming a one-hour threshold should be sufficient to accommodate all clock drift cases.

Timestamp Validation Logic

The validation occurs before appending a message to the active segment of the local log. We iterate through each batch included in the write request, and the validation logic follows this sequence for each record in the batch:

  1. If log.message.timestamp.type=LogAppendTime, the server overwrites the timestamp with the broker's timestamp and proceeds without further validation related to the record timestamp. Note that variations exist when the message is compressed or not compressed, but the existing logic remains unchanged. Regardless of compression, the timestamp is always overwritten with the broker's current time.
  2. We check if the timestamp included with the record is in the future (a new validation).
  3. If the timestamp is in the future, we create a new record level error, ApiRecordError,  with Error code 32 (INVALID_TIMESTAMP) and an error message indicating that the record timestamp is in the future compared to the broker's time.
  4. If the record timestamp is not in the future, we proceed to validate if the record timestamp falls within the allowable time difference configured in log.message.timestamp.difference.max.ms. The allowable timestamp difference is calculated as the absolute value of the record timestamp minus the broker's timestamp.
  5. If the allowable time difference falls outside the acceptable range as configured in log.message.timestamp.difference.max.ms, we create a record level error ApiRecordError with Error code 32 (INVALID_TIMESTAMP) and an error message indicating that the record timestamp is outside the acceptable timestamp difference range.

In this timestamp validation logic, only steps #2 and #3 represent the new changes proposed by this KIP, while the remaining steps reflect existing validations.

It is important to note that if a validation fails, even for a single record within a batch, we reject the entire batch. This behavior remains consistent for both timestamp validation scenarios: records with future timestamps and records with timestamps outside the allowable range.

Please note that the description above omits details of other orthogonal validations such as key verification in compacted topics, offset contiguity verification, and batch record count verification.


The code change that is proposed by this KIP is captured in this Pull Request.

Compatibility, Deprecation, and Migration Plan

The semantics of configuration log.message.timestamp.difference.max.ms is modified. Prior to this KIP, the config allowed for accommodating arbitrary time difference in future, whereas after this KIP messages with future timestamps that were previously accepted by the broker will now be rejected. We consider this an acceptable breaking change.

There are no other changes to public interfaces that will impact clients.

Test Plan

  • This change will be tested via unit test.
  • Additional verification will be done by recreating the issue manually and verifying that the correct error message is returned.

Future Work: Update Message format to include both Client Timestamp and LogAppend Timestamp

In the current message format (v2), a single Timestamp field is used, qualified by the timestamp type (0 for CreateTime, 1 for LogAppendTime). However, this design limits our selection to either the client timestamp or the broker's append time for log retention logic.

Nevertheless, both the client and broker timestamps are relevant as they capture different information and can be used to evaluate retention logic differently. The client timestamp closely represents the actual event timestamp as assigned by the producer. On the other hand, the broker's timestamp only indicates the time of log append, which can be considered an implementation detail.

To address this limitation, we have considered a modification that captures both the CreateTime and LogAppendTime, allowing for a more nuanced representation and validation of timestamps. However, the proposed change in this KIP does not involve updating the message format and incrementing the message version to v3. Such an update is a major undertaking that requires modifications across multiple components. We maintain a wiki entry that tracks the motivations for updating the message format, and it has been updated to include the proposal of capturing both timestamps.

Rejected Alternatives

Add a broker-level configuration for TIME_DRIFT_TOLERANCE instead of a hardcoded constant.

Pros:

  • Configuration Flexibility: Having the time drift tolerance as a configuration would allow Kafka users to adjust this value according to their specific requirements.

Cons:

  • Unusable configuration: Users typically resolve clock drift issues by fixing the clock drift on the broker or producer side, rather than adjusting the time drift tolerance configuration. Therefore, introducing a new configuration specifically for this purpose would likely result in an unused or disregarded configuration.
  • Increased complexity: Kafka already has a significant number of configurations and adjustable parameters. Adding a new configuration would contribute to the overall complexity of the system, requiring users to understand, manage, and adjust yet another parameter.
  • One-way door decision: Introducing this configuration would be an irreversible decision, and it would need to be supported in all future Kafka versions. However, if the decision is made to add the configuration in the future, instead of hardcoding the value, it could be implemented without causing disruptions or breaking existing functionalities.


  • No labels