Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In the org.apache.kafka.storage.internals.log.LogValidator, we will introduce a new validation that compares the message timestamp to the broker's timestamp and rejects the message if it is ahead of the broker's timestamp. This validation will be applicable when message.timestamp.type is set to CreateTime. There will be no other change to the exiting validation logic.

Considering potential clock drift issues that the broker may encounter with the time synchronization service of the underlying operating system, we propose the introduction of a constant, TIME_DRIFT_TOLERANCE, with a value of one hour. Time synchronization services like NTP and PTP are capable of fixing drift in the order of milliseconds. Therefore, assuming a one-hour threshold should be sufficient to accommodate all clock drift cases.

Timestamp Validation Logic

The validation occurs before appending a message to the active segment of the local log. We iterate through each batch included in the write request, and the validation logic follows this sequence for each record in the batch:

  1. If message.timestamp.type=LogAppendTime, the server overwrites the timestamp with the broker's timestamp and proceeds without further validation related to the record timestamp. Note that variations exist when the message is compressed or not compressed, but the existing logic remains unchanged. Regardless of compression, the timestamp is always overwritten with the broker's current time.
  2. We check if the timestamp included with the record is in the future (a new validation).
  3. If the timestamp is in the future, we create a new record level error, ApiRecordError,  with Error code 32 (INVALID_TIMESTAMP) and an error message indicating that the record timestamp is in the future compared to the broker's time.
  4. If the record timestamp is not in the future, we proceed to validate if the record timestamp falls within the allowable time difference configured in max.message.time.difference.ms. The allowable timestamp difference is calculated as the absolute value of the record timestamp minus the broker's timestamp.
  5. If the allowable time difference falls outside the acceptable range as configured in max.message.time.difference.ms, we create a record level error ApiRecordError with Error code 32 (INVALID_TIMESTAMP) and an error message indicating that the record timestamp is outside the acceptable timestamp difference range.

In this timestamp validation logic, only steps #2 and #3 represent the new changes proposed by this KIP, while the remaining steps reflect existing validations.

It is important to note that if a validation fails, even for a single record within a batch, we reject the entire batch. This behavior remains consistent for both timestamp validation scenarios: records with future timestamps and records with timestamps outside the allowable range.

Please note that the description above omits details of other orthogonal validations such as key verification in compacted topics, offset contiguity verification, and batch record count verification.


The code change that is proposed by this KIP is captured in this Pull Request.

...