...
Code Block | ||
---|---|---|
| ||
MessageAndOffset => MessageSize Offset Message MessageSize => int32 Offset => int64 Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value Crc => int32 MagicByte => int8 Attributes => int8 TimestampSendTime => int64 <---------------------- NEW ReceiveTime => int64 <---------------------- NEW KeyLength KeyLength => int32 Key => bytes ValueLength => int32 Value => bytes |
Proposed Changes
Wire protocol change
Add
...
SendTime and ReceiveTime field to message
- SendTime
- SendTime will be set by the producer and will not be changed afterward.
- SendTime accuracy is millisecond.
- ReceiveTime
- The ReceiveTime
- will be assigned by broker upon receiving the message. If the message is coming from mirror maker, the original
- SendTime will be maintained but the ReceiveTime will be changed by the target broker.
- The ReceiveTime
- will be used to build the log index.
- The
- ReceiveTime accuracy
- is millisecond
- The
- ReceiveTime of the outer message of compressed messages will be the latest
- ReceiveTime of all its inner messages.
- If the compressed message is not compacted, the relative offsets of inner messages will be contiguous and share the same
- ReceiveTime.
- If the compressed message is compacted, the relative offsets of inner messages may not be contiguous. Its
- ReceiveTime will be the
- ReceiveTime of the last inner message.
- The followers will not reassign
- ReceiveTime but simply update
- an in memory
- lastAppendedReceiveTime and append the message to the log.
- To handle leader migration where new leader has slower clock than old leader, all the leader should append max(lastAppendedTimestamp, currentTimeMillis) as the timestamp.
Change time based log rolling and retention to use timestamp
...
- The time based log rolling will be based on the timestamp of the first message in the log segment file.
- The time based log retention will be based on the timestamp of the last message in the log segment file.
Expose the timestamp in ConsumerRecord to users
...
ConsumerRecord / ProducerRecord format change
- Add a SendTime field to ProducerRecord. This field can be used by application to set the send time. It will also allow mirror maker to maintain the send time easily.
- Add both SendTime and ReceiveTime to ConsumerRecord.
- The SendTime is useful in use cases such as stream processing
- The ReceiveTime is useful in use cases such as cross colo failover.
New time-based log index
In order to enable timestamp based search at finer granularity, we need to add the timestamp to log indices as well. Broker will build time index based on ReceiveTime of messages.
Because all the index files are memory mapped files the main consideration here is to avoid significantly increasing the memory consumption.
...
Users don't typically need to look up offsets with seconds granularity.
Discussion of some
...
use cases
Mirror maker
The broker does not distinguish mirror maker from other producers. The following example explains what will the time stamp timestamp look like when there is mirror maker in the picture.(S - SendTime, R - ReceiveTime)
- Application producer produces message at T0.
( [S = T0, R = -1] ) - The broker in cluster1 received message at T1 = T0 + latency1 and appended the message to the log. Latency1 includes linger.ms and some other latency. ( [S = T0, R = T1] )
- Mirror maker copies the message to broker in cluster2. ( [S = T0, R = T1] )
- The broker in cluster2 receives message at T2 = T1 + latency2 and appended the message to the log.
( [S = T0, R = T2] )
The SendTime So in the case of mirror maker, the timestamp of a message in source cluster and target cluster will be differentsame. i.e. the timestamp is not carried over cross cluster. This will ensure that the timestamp passed across clusters.
The ReceiveTime of a message in source cluster and target cluster will be different. This is because: 1) log retention/rolling needs to based on server clock to provide clear guarantee. 2) To support searching by timestamp, the ReceiveTime in a log file is needs to be monotonically increasing. Even , even when the messages in the target cluster came from different source clusters.
Leader migration
Suppose we have broker0 and broker1. Broker0 is the current leader of a partition and broker1 is a follower. Consider the following scenario:
- message m0 is produced to broker 0 at time T0 (R = T0, T0 is the clock on broker 0).
- broker1 as a follower replicated m0 and appended to its own log without changing the timestampReceiveTime of m0.
- broker0 went down and broker 1 become the new leader.
- message m1 is produced to broker 1 at time T1 (R = T1, T1 is the clock on broker 1).
...
To be more general, when a message is appended, broker (whether leader or follower) should remember the timestamp of the last appended message, if a later appended message has a timestamp earlier than the timestamp of last appended message, the timestamp of last appended message will be used.
Compatibility, Deprecation, and Migration Plan
...