Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

There will be no change to existing public interfaces. However, when a producer's record is rejected due to this new validation, we will return error code 32 (INVALID_TIMESTAMP) with the error message "Timestamp of the message with offset [record offset] is ahead of the broker's current time."

We are also introducing two new configurations and deprecating the configuration log.message.timestamp.difference.max.ms as discussed in the Proposed Changes

...

section.

Proposed Changes

We will introduce two new configurations: log.message.timestamp.before.max.ms and log.message.timestamp.after.max.ms. Additionally, we will deprecate the existing configuration log.message.timestamp.difference.max.ms.

log.message.timestamp.before.max.ms: This configuration defines that the message timestamp must always be earlier than or equal to the broker's timestamp, with a maximum allowable difference determined by the value of log.message.timestamp.before.max.ms.

  - Type: long
  - Default: 9223372036854775807
  - Valid Values: [0,...]
  - Update Mode: cluster-wide

  Note that the default value of log.message.timestamp.before.max.ms is kept the same as log.message.timestamp.difference.max.ms to maintain backward compatibility.


log.message.timestamp.after.max.ms: This configuration defines that the message timestamp must always be later than or equal to the broker's timestamp, with a maximum allowable difference determined by the value of log.message.timestamp.after.max.ms.

  - Type: long
  - Default: 3600000
  - Valid Values: [0,...]
  - Update Mode: cluster-wide

Note that the default value for log.message.timestamp.after.max.ms is set to 3600000 milliseconds (one hour). In the org.apache.kafka.storage.internals.log.LogValidator, we will introduce a new validation that compares the message timestamp to the broker's timestamp and rejects the message if it is ahead of the broker's timestamp. This validation will be applicable when log.message.timestamp.type is set to CreateTime. There will be no other change to the exiting validation logic.
Considering potential clock drift issues that the broker may encounter with the time synchronization service of the underlying operating system, we propose the introduction of a constant, TIME_DRIFT_TOLERANCE, with a default value of this configuration to be one hour. Time synchronization services like NTP and PTP are capable of fixing drift in the order of milliseconds. Therefore, assuming a one-hour threshold should be sufficient to accommodate all clock drift cases.

Timestamp Validation Logic

The validation occurs before appending a message to the active segment of the local log.

...

If log.message.timestamp.type=LogAppendTime, the server overwrites the timestamp with the broker's timestamp and proceeds without further validation related to the record timestamp. Note that variations exist when the message is compressed or not compressed, but the existing logic remains unchanged. Regardless of compression, the timestamp is always overwritten with the broker's current time.

We

...

iterate through each batch included in the write request, and the validation logic in sudo code for each record looks like the following:

Code Block
languagejava
titleNew Validation Logic
if message.timestamp <= broker.timestamp:
    time_difference = broker.timestamp - message.timestamp
    if time_difference <= log.message.timestamp.before.max.ms:
        # Validation passed for timestamp before
        // Proceed with further logic
    else:
        # Validation failed for timestamp before
        return Error code 32 (INVALID_TIMESTAMP)
else if message.timestamp > broker.timestamp:
    time_difference = message.timestamp - broker.timestamp
    if time_difference <= 

...

log.message.timestamp.

...

after.max.ms

...

:
        # Validation passed for timestamp after
        // Proceed with further logic
    else:
        # Validation failed for timestamp after
        return Error code 32 (INVALID_TIMESTAMP)


For comparison, the validation logic above replaces the current validation, which is represented by the following sudo code:

Code Block
languagejava
titleExisting Validation Logic
time_difference = absolute(message.timestamp - broker.timestamp)

if time_difference > 

...

log.message.timestamp.difference.max.ms

...

:
    # Validation failed for timestamp difference
    return Error code 32 (INVALID_TIMESTAMP)
else:
    # Validation passed for timestamp difference
    # Proceed with further logic


It is important to note that if a validation fails, even for a single record within a batch, we reject the entire batch. This behavior remains consistent for both timestamp validation scenarios: records with future timestamps and records with past timestamps

...

.

Please note that the description above omits details of other orthogonal validations such as key verification in compacted topics, offset contiguity verification, and batch record count verification

...

.

Compatibility, Deprecation, and Migration Plan

...