You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state: Under Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP is try to address the following issue in Kafka.

  1. Log retention might not be honored: Log retention is currently at the log segment level, and is driven off the last modification time of a log segment. This approach does not quite work when a replica reassignment happens because the newly created log segment will effectively have its modification time reset to now.
  2. Searching offset by time stamp has very coarse granularity (log segment level), it also does not work well when replica is reassigned.

This KIP is a distilled/improved version of an earlier discussion that we started.

This KIP is preferably to be implemented with KIP-31 if possible to avoid changing wire protocol twice.

Public Interfaces

We propose the following change to the message format

MessageAndOffset => MessageSize Offset Message
  MessageSize => int32
  Offset => int64
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    Timestamp => int64 <---------------------- NEW
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

Proposed Changes

Wire protocol change

Add a timestamp field to message

  • The timestamp will be assigned by broker upon receiving the message. If the message is coming from mirror maker, the original time stamp will be discarded. In essence, we treat timestamp as another offset-like field. If application needs a timestamp, it can be put into payload.
  • The timestamp will be used to build the log index.
  • The tiimestamp accuracy would be millisecond
  • The timestamp of the outer message of compressed messages will be the latest timestamp of all its inner messages.
    • If the compressed message is not compacted, the relative offsets of inner messages will be contiguous and share the same timestamp.
    • If the compressed message is compacted, the relative offsets of inner messages may not be contiguous. Its timestamp will be the timestamp of the last inner message.
  • The followers will not reassign timestamp but simply update a in memory lastAppendedTimestamp and append the message to the log.
  • To handle leader migration where new leader has slower clock than old leader, all the leader should append max(lastAppendedTimestamp, currentTimeMillis) as the timestamp.

Change time based log rolling and retention to use timestamp

Time based log rolling and retention currently use the file creation time and last modified time. This does not work for newly created replicas because the time attributes of files is different from the true message append time. The following changes will address this issue.

  • The time based log rolling will be based on the timestamp of the first message in the log segment file.
  • The time based log retention will be based on the timestamp of the last message in the log segment file.

Expose the timestamp in ConsumerRecord to users

The timestamp will be exposed in ConsumerRecord to user.

New time-based log index

In order to enable timestamp based search at finer granularity, we need to add the timestamp to log indices as well.

Because all the index files are memory mapped files the main consideration here is to avoid significantly increasing the memory consumption.

The time index file needs to be built just like the log index file based on each log segment file.

Use a time index for each log segment to save the timestamp -> log offset at minute granularity

Create another index file for each log segment with name SegmentBaseOffset.time.index to have index at minute level. The time index entry format is:

 

Time Index Entry => Timestamp Offset
  Timestamp => int64
  Offset => int32

The time index granularity does not change the actual timestamp searching granularity. It only affects the time needed for searching. The way it works will be the same as offset search - find the closet timestamp and corresponding offset, then start the leaner scan over the log until find the target message. The reason we prefer minute level indexing is because timestamp based search is usually rare so it probably does not worth investing significant amount of memory in it.

The time index will be built based on the log index file. Every time when a new entry is inserted into log index file, we take a look at the timestamp of the message and if it falls into next minute, we insert an entry to the time index as well. The following table give the summary of memory consumption using different granularity. The number is calculated based on a broker with 3500 partitions.

second864003.4 GB
Minute144057 MB

Users don't typically need to look up offsets with seconds granularity.

Discussion of some scenarios

Mirror maker

The broker does not distinguish mirror maker from other producers. The following example explains what will the time stamp look like when there is mirror maker in the picture.

  1. Application producer produces message at T0. 
     
  2. The broker in cluster1 received message at T1 = T0 + latency1 and appended the message to the log. Latency1 includes linger.ms and some other latency.
  3. Mirror maker copies the message to broker in cluster2.
  4. The broker in cluster2 receives message at T2 = T1 + latency2 and appended the message to the log.

So in the case of mirror maker, the timestamp of a message in source cluster and target cluster will be different. i.e. the timestamp is not carried over cross cluster. This will ensure that the timestamp in a log file is monotonically increasing. Even when the messages in the target cluster came from different source clusters.

Leader migration

Suppose we have broker0 and broker1. Broker0 is the current leader of a partition and broker1 is a follower. Consider the following scenario:

  1. message m0 is produced to broker 0 at time T0 (T0 is the clock on broker 0).
  2. broker1 as a follower replicated m0 and appended to its own log without changing the timestamp.
  3. broker0 went down and broker 1 become the new leader.
  4. message m1 is produced to broker 1 at time T1 (T1 is the clock on broker 1).

In step 4, it is possible that T1 < T0 because of the time difference on two brokers. If we naively take T1 as the timestamp of m1, then the timestamp will be out of order. i.e. a message with earlier timestamp will show later in the log. To avoid this problem, at step 4, broker 1 can take max(T1, T0) as the timestamp for m1. So the timestamp in the log will not go backward. Broker 1 will be using T0 as timestamp until its own clock advances after T0.

To be more general, when a message is appended, broker (whether leader or follower) should remember the timestamp of the last appended message, if a later appended message has a timestamp earlier than the timestamp of last appended message, the timestamp of last appended message will be used.

Compatibility, Deprecation, and Migration Plan

NOTE: This part is drafted based on the assumption that KIP-31 and KIP-32 will be implemented in one patch.

The proposed protocol is not backward compatible. The migration plan are as below:

  1. Increment magic byte in MessageAndOffset from 0 to 1.
  2. Upgrade broker to support both V0 and V1 of MessageAndOffset.
    1. When broker see a producer request using V0, it will still decompress the message, assign offsets and re-compress.
    2. When broker see a producer request using V1, it will decompress the message for verification, assign the offset to outer message and NOT do recompression.
  3. Upgrade consumer to support both V0 and V1.
  4. Upgrade producer to send MessageAndOffset V1.

For producer, there will be no impact.

For old consumers that cannot recognize V1, if they see the new protocol an exception will be thrown for unsupported version. (The current code in ZookeeperConsumerConnector does not validate the magic byte. This is a bug and we will fix it in a separate ticket)

For upgraded consumers, they can handle both V0 and V1.

Rejected Alternatives

Add a timestamp field to log index entry

The most straight forward approach to have a time index is to let the log index files have a timestamp associate with each entry.

Log Index Entry => Offset Position Timestamp
  Offset => int32
  Position => int32
  Timestamp => int64

Because the index entry size become 16 bytes instead of 8 bytes. The index file size also needs to be doubled. As an example, one of the broker we have has ~3500 partitions. The index file took about 16GB memory. With this new format, the memory consumption would be 32GB.

  • No labels