THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
- Add a new timestamp field to the message format.
- Add the following two configurations to the broker
- message.timestamp.type - This topic level configuration defines the type of timestamp in the messages of a topic. The valid values are CreateTime or LogAppendTime.
- max.message.time.difference.ms - This configuration only works when message.timestamp.type=CreateTime. The broker will only accept messages whose timestamp differs no more than max.message.time.difference.ms from the broker local time.
- Add a timestamp field to ProducerRecord and ConsumerRecord. A producer will be able to set a timestamp for a ProducerRecord. A consumer will see the message timestamp when it sees the messages.
- Add ProduceRequest/ProduceResponse V2 which uses the new message format.
- Add a timestamp in ProduceResponse V2 for each partition. The timestamp will either be LogAppendTime if the topic is configured to use it or it will be NoTimestamp if create time is used.
- Add FetchRequest/FetchResponse V2 which uses the new message format.
...
- Allow user to stamp the message when produce
- When a leader broker receives a message
- If message.timestamp.type=LogAppendTime, the server will override the timestamp with its current local time and append the message to the log.
- If the message is a compressed message. the timestamp in the compressed message will be updated to current server time. Broker will check to make sure all the inner messages has timestamp -1. If the inner message's timestamp is not -1, the broker will overwrite it and do the recompression. We do this instead of writing current server time to each message is to avoid recompression penalty when people are using LogAppendTime.
- If the message is a non-compressed message, the timestamp in the message will be overwritten to current server time.
- If message.timestamp.type=CreateTime
- If the time difference is within a configurable threshold max.message.time.difference.ms, the server will accept it and append it to the log. For compressed message, server will update the timestamp in compressed message to the largest timestamp of the inner messages.
- If the time difference is beyond the configured threshold max.message.time.difference.ms, the server will reject the entire batch with TimestampExceededThresholdException.
- When a follower broker receives a message
- If the message is a compressed message, the timestamp in the compressed message will be used to build the time index. i.e. the timestamp in a compressed messages is always the largest timestamp of all its inner messages.
- If the message is a non-compressed message, the timestamp of this message will be used to build time index.
- When a consumer receives a message
- If a message is a compressed message
- If the inner message timestamp is not -1, that timestamp will be used.
- If the inner message timestamp is -1, the timestamp of the compressed message will be used as its timestamp.
- If a message is a non-compressed message, the timestamp of the message will be used.
- If a message is a compressed message
- message.timestamp.type and max.message.time.difference.ms will be a per topic configuration.
- The indexed will be built so it has the following guarantees. (Please notice that the time index will be implemented in KIP-33 instead of this KIP. The behavior discussion is put here because it is closely related to the design of this KIP)
- If user search by a timestamp:
- all the messages after the searched timestamp will be consumed.
- user might see earlier messages.
- The log retention will take a look at the last time index entry in the time index file. Because the last entry will be the latest timestamp in the entire log segment. If that entry expires, the log segment will be deleted.
- The log rolling will depend on the largest timestamp of all messages ever seen. If no message is appended in log.roll.ms after largest appended timestamp, a new log segment will be rolled out.
- If user search by a timestamp:
- The downside of this proposal are:
- The timestamp might not be monotonically increasing if message.timestamp.type=CreateTime.
- The log retention might become non-deterministic. i.e. When a message will be deleted now depends on the timestamp of the other messages in the same log segment. And those timestamps are provided by user within a range depending on what the time difference threshold configuration is.
- Although the proposal has some downsides, it gives user the flexibility to use the timestamp.
- If message.timestamp.type=CreateTime
- When time difference threshold is set to Long.MaxValue. The timestamp in the message is equivalent to CreateTime.
- When time difference threshold is between 0 and Long.MaxValue, it ensures the messages will always have a timestamp within a certain range.
- If message.timestamp.type=LogAppendTime, the timestamps will be log append time.
- If message.timestamp.type=CreateTime
The following changes will be made to implement the above proposal.
Wire protocol change - add a Time field to the message format
- In ProduceResponse V2, a timestamp will be returned for each partition.
- If the topic uses LogAppendTime, the timestamp returned would be the LogAppendTime for the message set.
- If the topic uses Create Time, the timestamp returned would be NoTimestamp.
- When producer invokes the callback for each message, it uses the timestamp returned in the produce response if it is not NoTimestamp. Otherwise, it uses the message timestamp which is tracked by the producer.
- The producer will be able to tell whether the timestamp is LogAppendTime or CreateTime in this case.
- The time index will be built so it has the following guarantees. (Please notice that the time index will be implemented in KIP-33 instead of this KIP. The behavior discussion is put here because it is closely related to the design of this KIP)
- If user search by a timestamp:
- all the messages after the searched timestamp will be consumed.
- user might see earlier messages.
- The log retention will take a look at the last time index entry in the time index file. Because the last entry will be the latest timestamp in the entire log segment. If that entry expires, the log segment will be deleted.
- The log rolling will depend on the largest timestamp of all messages ever seen. If no message is appended in log.roll.ms after largest appended timestamp, a new log segment will be rolled out.
- If user search by a timestamp:
- The downside of this proposal are:
- The timestamp might not be monotonically increasing if message.timestamp.type=CreateTime.
- The log retention might become non-deterministic. i.e. When a message will be deleted now depends on the timestamp of the other messages in the same log segment. And those timestamps are provided by user within a range depending on what the time difference threshold configuration is.
- Although the proposal has some downsides, it gives user the flexibility to use the timestamp.
- If message.timestamp.type=CreateTime
- When time difference threshold is set to Long.MaxValue. The timestamp in the message is equivalent to CreateTime.
- When time difference threshold is between 0 and Long.MaxValue, it ensures the messages will always have a timestamp within a certain range.
- If message.timestamp.type=LogAppendTime, the timestamps will be log append time.
- If message.timestamp.type=CreateTime
The following changes will be made to implement the above proposal.
Wire protocol change - add a Time field to the message format
Code Block | ||
---|---|---|
| ||
Code Block | ||
| ||
MessageAndOffset => Offset MessageSize Message Offset => int64 MessageSize => int32 Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value Crc => int32 MagicByte => int8 <---------------------- Bump up magic byte to 1 Attributes => int8 Timestamp => int64 <---------------------- NEW KeyLength => int32 Key => bytes ValueLength => int32 Value => bytes |
...
I.e. by default the server will not override any user timestamp.
Add a timestamp field to RecordMetadata
- The timestamp in record metadata will be LogAppendTime if it is returned from broker, or it will be the timestamp set by user in ProducerRecord.
- When producer invokes the callback for a message, the timestamp will be available through RecordMetadata.
Add ProduceRequest/ProduceResponse V2 and FetchRequest/FetchResponse V2
The fields of ProduceRequest and FetchResponse V2 will be the same as V1. The difference is the format of the messages.
The fields of ProduceResponse V2 will be changed to following:
Code Block | ||||
---|---|---|---|---|
| ||||
ProduceResponse => [ResponseStatus] ThrottleTime
ResponseStatus => TopicName [Partition ErrorCode Offset timestamp]
TopicName => string
Partition => int32
ErrorCode => int16
Offset => int64
timestamp => int64 <------------------- NEW
ThrottleTime => int 64
|
The fields of the ProduceResponse and FetchRequest V2 will be the same as V1.
...