Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • org.apache.kafka.common.record
  • org.apache.kafka.clients.producer
  • org.apache.kafka.streams.processor

Proposed Changes

The broker uses −1 as a default value for missing timestamp. Which might be a correct value set by the user: Wednesday, December 31, 1969 11:59:59 PM UTC.

The solution is to ignore that problem and it is a choice of the user to:

  1. interpret this value as a real timestamp
  2. interpret −1 millisecond as "no timestamp" and other values as a real timestamp.

In general, there should be is no reason to fail on broker/client side working with negative timestamps.

Compatibility, Deprecation, and Migration Plan

What impact (if any) will there be on existing users?

No impact on current users, they should update their infrastructure in that order: Broker, Consumers, Producers.

If we are changing behavior how will we phase out the older behavior? 

We are not changing the behavior in the way we would need to phase out the older behavior. 

If we need special migration tools, describe them here.

No need.

When will we remove the existing behavior?

No need.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

Message flag "hasTimestamp"

  1. add a special boolean flag to message record "hasTimestamp",
  2. write a migration tool that adds this flag to message with the negative timestamp to legacy messages,
  3. make sure clients know about that field and check them

Timestamp delta

In case it's known that you would need no more than 200 years back before Unix epoch, the possible solution is:

  • producers should add that "delta" to an original timestamp,
  • all consumers who do lookups by timestamp should know about that "delta",
  • keep −1 semantics as it is now.

It may seem the easiest thing to do, nevertheless changing −1 semantics is a cleaner solution.

Topic property "may have a valid negative timestamps"

Add a topic property that specifies that topic may have a valid negative timestamps.

...

  • client should be able to publish record with a negative timestamp (to the topics that support that),
  • broker should accept and serve that record,
  • streams should not drop a record with a negative timestamp.

Changes from producer perspective

 Old Broker BehaviourNew Broker Behaviour

Old producer sends NO_TIMESTAMP (−1L)

isTimestampExtended = 0

(tick) Records this as NO_TIMESTAMP (−1L)(tick) Records this as NO_TIMESTAMP (Long.MIN_VALUE)

New producer sends NO_TIMESTAMP (

...

The broker uses −1 as a default value for missing timestamp. Which might be a correct value set by the user: Wednesday, December 31, 1969 11:59:59 PM UTC.

Proposed behavior is to change that semantics and use Long.MIN_VALUE for messages without timestamp.

Long.MIN_VALUE)

isTimestampExtended = 1

(error) Error or Records this as NO_TIMESTAMP (−1L)(tick) Records this as NO_TIMESTAMP (Long.MIN_VALUE)

New producer sends −1L ms

isTimestampExtended = 1

(error) Error or Records this as NO_TIMESTAMP (−1L)(tick) Records this as −1L ms

So broker should be updated first, before producers.

Changes from consumer perspective

 Old ConsumerNew Consumer

Record has TS −1L

isTimestampExtended = 0

(tick) Interpret as NO_TIMESTAMP (−1L)(tick) Interpret as NO_TIMESTAMP (Long.MIN_VALUE)
Record has TS −1L

isTimestampExtended = 1

(error) Error or NO_TIMESTAMP(tick) Interpret as −1L ms
Record has TS Long.MIN_VALUE

isTimestampExtended = 1

(error) Error or NO_TIMESTAMP(tick) Interpret as NO_TIMESTAMP (Long.MIN_VALUE)

So only new consumers will read records correctlyWe may need a new timestamp type along with CreateTime and LogAppendTime to prevent legacy brokers from writing −1 (with meaning no timestamp) to the new topics.

Changes in binary message format


Current binary format:

 

Code Block
XXXX XXXX = 8 bits

 1. XXXX XXXX XXXX XXXX XXXX XXXX XXXX XXXX - baseOffset
    XXXX XXXX XXXX XXXX XXXX XXXX XXXX XXXX
 2. XXXX XXXX XXXX XXXX XXXX XXXX XXXX XXXX - batchLength
 3. XXXX XXXX XXXX XXXX XXXX XXXX XXXX XXXX - partitionLeaderEpoch
 4. XXXX XXXX                               - magic (current magic value is 2)
 5. XXXX XXXX XXXX XXXX                     - attributes

    Compression
    000 - no comporession
    001 - gzip
    010 - snappy
    011 - lz4

       Timestamp
       0 - create time
       1 - log append time

         X - isTransactional
          X - isControlBatch
           XX XXXX XXXX - unused

 6. XXXX XXXX XXXX XXXX XXXX XXXX XXXX XXXX - lastOffsetDelta
 7. XXXX XXXX XXXX XXXX XXXX XXXX XXXX XXXX - firstTimestamp
    XXXX XXXX XXXX XXXX XXXX XXXX XXXX XXXX
 8. XXXX XXXX XXXX XXXX XXXX XXXX XXXX XXXX - maxTimestamp
    XXXX XXXX XXXX XXXX XXXX XXXX XXXX XXXX
 9. XXXX XXXX XXXX XXXX XXXX XXXX XXXX XXXX - producerId
    XXXX XXXX XXXX XXXX XXXX XXXX XXXX XXXX
10. XXXX XXXX XXXX XXXX XXXX XXXX XXXX XXXX - producerEpoch
11. ...                                     - baseSequence
12. ...                                     - records

Proposed change: let's use one of the reserved bits to indicate that timestamp can be negative.

Code Block
 5. XXXX XXXX XXXX XXXX - attributes

    Compression
    000 - no comporession
    001 - gzip
    010 - snappy
    011 - lz4

       Timestamp
       0 - create time
       1 - log append time

         X - isTransactional
          X - isControlBatch
           X - isTimestampExtended ←
            X XXXX XXXX - unused

...

That isTimestampExtended

...

 bit should

...

be 1

...

 for all new records.

Broker should convert

...

old NO_TIMESTAMP=−1L

...

 to new NO_TIMESTAMP_EXTENDED

...

Compatibility, Deprecation, and Migration Plan

What impact (if any) will there be on existing users?

No impact on current users, they should update their infrastructure in that order: Broker, Consumers, Producers.

If we are changing behavior how will we phase out the older behavior? 

We are not changing the behavior in the way we would need to phase out the older behavior. 

If we need special migration tools, describe them here.

A migration tool is not required.

When will we remove the existing behavior?

No need.

Changes from producer perspective

...

Old producer sends NO_TIMESTAMP (−1L)

...

=

...

Long.MIN_VALUE

...

New producer sends NO_TIMESTAMP (Long.MIN_VALUE)

isTimestampExtended = 1

...

New producer sends −1L ms

isTimestampExtended = 1

...

So broker should be updated first, before producers.

Changes from consumer perspective

 Old ConsumerNew Consumer

Record has TS −1L

isTimestampExtended = 0

(tick) Interpret as NO_TIMESTAMP (−1L)(tick) Interpret as NO_TIMESTAMP (Long.MIN_VALUE)
Record has TS −1L

isTimestampExtended = 1

(error) Error or NO_TIMESTAMP(tick) Interpret as −1L ms
Record has TS Long.MIN_VALUE

isTimestampExtended = 1

(error) Error or NO_TIMESTAMP(tick) Interpret as NO_TIMESTAMP (Long.MIN_VALUE)

So only new consumers will read records correctly.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

Message flag "hasTimestamp":

  1. add a special boolean flag to message record "hasTimestamp",
  2. write a migration tool that adds this flag to message with the negative timestamp to legacy messages,
  3. make sure clients know about that field and check them

Ignore −1 problem:

The solution could be just to ignore that problem and it is a choice of the user to:

  1. interpret this value as a real timestamp
  2. interpret −1 as "no timestamp" and other values as a real timestamp (can we borrow 1 millisecond for our needs?).

Timestamp delta:

In case it's known that you would need no more than 200 years back before Unix epoch, the possible solution is:

  • producers should add that "delta" to an original timestamp,
  • all consumers who do lookups by timestamp should know about that "delta",
  • keep −1 semantics as it is now.

It may seem the easiest thing to do, nevertheless changing −1 semantics is a cleaner solution.