Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Authors Mehari Beyene, Divij Vaidya

Status

Current state: Under Accepted

Discussion thread: here

Discussion Vote thread: here

JIRA: KAFKA-14991

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

There will be no change to existing public interfaces. However, when a producer's record is rejected due to this new the timestamp validation, we will return error code 32 (INVALID_TIMESTAMP) with the error message "Timestamp [record timestamp] of the message with offset [record offset] is ahead of the broker's current time."out of range. The timestamp should be within [ [brokerTime - log.message.timestamp.before.max.ms], [brokerTime + log.message.timestamp.after.max.ms:] ]."

Note that the error message will remain the same as the current timestamp validation, except that we will be introducing two new configurations: `log.message.timestamp.before.max.ms` and `log.message.timestamp.after.max.ms`. Additionally, We are also introducing two new configurations and deprecating the configuration log.message.timestamp.difference.max.ms as will be deprecated. Details of these configuration changes are discussed in the Proposed Changes section.

Proposed Changes

We will introduce two new configurations: log.message.timestamp.before.max.ms and log.message.timestamp.after.max.ms. Additionally, we will deprecate the existing configuration log.message.timestamp.difference.max.ms.

...

log.message.timestamp.after.max.ms: This configuration defines that the message timestamp must always can not be later than or equal to the broker's timestamp, with a maximum allowable difference determined by the value of log.message.timestamp.after.max.ms.

  - Type: long
  - Default: 36000009223372036854775807
  - Valid Values: [0,...]
  - Update Mode: cluster-wide

Note that the default value for of log.message.timestamp.after.max.ms is set to 3600000 milliseconds (one hour). Considering potential clock drift issues that the broker may encounter with the time synchronization service of the underlying operating system, we propose the default value of this configuration to be one hour. Time synchronization services like NTP and PTP are capable of fixing drift in the order of milliseconds. Therefore, assuming a one-hour threshold should be sufficient to accommodate all clock drift caseskept the same as log.message.timestamp.difference.max.ms to maintain backward compatibility and not introduce a breaking change for clients that are sending messages with future timestamps. However, in a future major version bump, we will change this default value to 3600000 milliseconds (one hour). To prepare users for this change in the future, we will add a WARN-level log when we encounter messages with future timestamps that are ahead of the proposed one-hour threshold.

Timestamp Validation Logic

...

Code Block
languagepy
titleNew Validation Logic
if message.timestamp <= broker.timestamp:
    time_difference = broker.timestamp - message.timestamp
    if time_difference <= log.message.timestamp.before.max.ms:
        # Validation passed for timestamp before
        //# Proceed with further logic
    else:
        # Validation failed for timestamp before
        return Error code 32 (INVALID_TIMESTAMP)
else if message.timestamp > broker.timestamp:
    time_difference = message.timestamp - broker.timestamp
    if time_difference <= log.message.timestamp.after.max.ms:
        # Validation passed for timestamp after
        //# Proceed with further logic
    else:
        # Validation failed for timestamp after
        return Error code 32 (INVALID_TIMESTAMP)

...

Compatibility, Deprecation, and Migration Plan

...

  • ms. With this KIP,

...

  • message timestamps with past and future values are now validated using different default values, defined by the configurations log.message.timestamp.before.max.ms and log.message.timestamp.after.max.ms respectively.
  • After this KIP messages with future timestamps are validated using the new configuration log.message.timestamp.after.max.ms. This will result in for some messages to be rejected that were previously accepted

...

  • . We consider this an acceptable breaking change.

...


There are no other changes to public interfaces that will impact clients.

...

To address this limitation, we have considered a modification that captures both the CreateTime and LogAppendTime, allowing for a more nuanced representation and validation of timestamps. However, the proposed change in this KIP does not involve updating the message format and incrementing the message version to v3. Such an update is a major undertaking that requires modifications across multiple components. We maintain a wiki entry that tracks the motivations for updating the message format, and it has been updated to include the proposal of capturing both timestamps.

Rejected Alternatives

Add a broker-level configuration for constant, TIME_DRIFT_TOLERANCE instead of a hardcoded constant., with a one-hour value and use it to validate message timestamps with future values. Example code: https://github.com/apache/kafka/pull/13709


Pros:

  • Configuration Flexibility: Having the time drift tolerance as a configuration would allow Kafka users to adjust this value according to their specific requirementsSimplicity: Hardcoding a constant value simplifies the validation logic by eliminating the need for additional configurations. It reduces complexity and potential configuration errors.

Cons:

  • Lack of Flexibility: Users may have use cases that require extending the future timestamp validation beyond the default one hour, and the hardcoded constant value restricts this flexibility.
  • Compatibility Issues: The hardcoded constant value of one hour can create compatibility issues when upgrading existing clusters or integrating with existing producers
  • Unusable configuration: Users typically resolve clock drift issues by fixing the clock drift on the broker or producer side, rather than adjusting the time drift tolerance configuration. Therefore, introducing a new configuration specifically for this purpose would likely result in an unused or disregarded configuration.
  • Increased complexity: Kafka already has a significant number of configurations and adjustable parameters. Adding a new configuration would contribute to the overall complexity of the system, requiring users to understand, manage, and adjust yet another parameter.
  • One-way door decision: Introducing this configuration would be an irreversible decision, and it would need to be supported in all future Kafka versions. However, if the decision is made to add the configuration in the future, instead of hardcoding the value, it could be implemented without causing disruptions or breaking existing functionalities.