Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAccepted

Discussion threadhere

JIRA:  here KAFKA-4148

Released: 0.10.1.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP includes two parts:

Part 1: Introduce ListOffsetRequest v1 to support accurate search based on timestamp. 

With KIP-33, the brokers can now search messages by timestamp accurately. To maintain the backwards compatibility, we did not change the behavior of ListOffsetRequest v0. In this KIP, we will introduce ListOffsetRequest v1 to provide more accurate search based on timestamp.

Part 2: Add a search message by

...

timestamps method to o.a.k.c.consumer.Consumer

In SimpleConsumer, users used to be able to search the offsets by timestamp. This method no longer exists in the KafkaConsumer. We want to introduce the method to allow user to search the offsets by timestamp. This is useful in a few cases, for example:

...

Although the above two parts can be two separate KIPs, but since part 2 will be heavily depending on part 1 and may also potentially impact the semantic for the ListOffsetRequest v1, it might be better to discuss them together to make sure the wire protocol change can satisfy the various use cases on the consumer side.

Public Interfaces

Add ListOffsetRequest(ListOffsetRequest) v1

...

// ListOffsetResponse v1 
ListOffsetResponse => [TopicName [PartitionOffsets]] 
  PartitionOffsets => Partition ErrorCode Timestamp [Offset]
  Partition => int32 
  ErrorCode => int16 
  Timestamp => int64 
  Offset => intint64

Add new Error Code 43 - UnsupportedForMessageFormat

This error code is added to indicate that the requested operation is not supported by the message format version.  In this KIP it means the timestamp search operation is not supported by the message format before 0.10.0 when ListOffsetRequest is v1. This error code could also be used in the future when message format evolves again. (e.g. KIP-82).

Add a new method to o.a.k.c.consumer.Consumer to allow user search offsets by timestamp.

Code Block
languagejava
titleoffsetForTime() method
/**
 * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the 
 * earliest offset whose timestamp is greater than or equals to the given timestamp in the corresponding partition.
 * 
 * This is a blocking call. The consumer does not have to be assigned the partitions.
 *
 * @param timestampsToSearch the mapping from partition to the timestamp to look up.
 * @return For each partition, returns the timestamp and offset of the first message with timestamp greater 
 *         than or equal to the target timestamp.
 */
Map<TopicPartition, TimestampOffset>OffsetAndTimestamp> offsetForTimeoffsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);

/**
 * Get the earliest available offsets for the given partitions.
 * 
 * @param partitions the partitions to get the earliest offsets.
 * @return The earliest available offsets for the given partitions
 */
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions);
 
/**
 * Get the latest offsets for the given partitions.
 * 
 * @param partitions the partitions to get the latest offsets.
 * @return The latest available offsets for the given partitions.
 */
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions);
 
public class OffsetAndTimestamp {
	private final long timestamp;
	private final long offset;
 
	public OffsetAndTimestamp(long offset, long timestamp) {
		this.timestamp = timestamp;
		this.offset = offset;
	}
 
	public long timestamp() { return timestamp; }
	public long offset() { return offset; }
}

Proposed Changes

ListOffsetRequest/ListOffsetResponse v1

Add ListOffsetRequest/ListOffsetResponse v1 with the following changes compared with ListOffsetRequest/ListOffsetResponse v0

...

If message format on the broker side is before 0.10, i.e. messages do not have timestamps. ListOffsetRequest v1 will only work if target time = -1 or -2. Any other target time will result in an InvalidRequestException UnsupportedForMessageFormat (error code 4243).

Implementation wise, we will migrate to o.a.k.common.requests.ListOffsetRequest class on the broker side.

Because Kafka request schema uses an Array to represent a Map, it is possible that users form a ListOffsetRequest that contains duplicate partitions with differen timestamps.

...

In this case, the broker will return an InvalidRequestException(error code 42) for that partition in the ListOffsetResponse.

offsetsForTimes() method in Consumer

Add a new method to allow users to search messages by timestamp. The input of the method is a mapping from partitions to the target times. The output of the method is a mapping from partitions to the timestamps and offsets of the messages whose timestamps are greater than or equal to the given target time of each partition.

The most important use case for offsetForTimeoffsetsForTimes() method is to consume from the returned offsets. The following code gives an example to do that:

Code Block
languagejava
titleconsume from timestamp
long offset = consumer.offsetForTimeoffsetsForTimes(Collections.singletonMap(topicPartition, targetTime)).get(topicPartition).offset;
consumer.seek(topicPartition, offset);
ConsumerRecords records = consumer.poll();

The target timestamp have to be non-negative. Otherwise an IllegalArgumentException will be thrown. If no message has a timestamp that is greater than or equals to the target time, a null will be returned.

beginningOffsets() and endOffsets()

earlistOffsets() and endOffsets() return the first and last offset for the given partitions. 

Compatibility, Deprecation, and Migration Plan

This KIP is a pure addition, so there is no backward compatibility concern.

The new ListOffsetRequest v1 will only be used by new consumer. The old high level consumer will still be using ListOffsetRequest v0. We will not update SimpleConsumer either as we are already in the progress of deprecating it. OffsetRequest(ListOffsetRequest) v0 will be deprecated together with old high level consumer and SimpleConsumer.

Test Plan

Test the new consumer against broker with message format before and after 0.10.

Test searchByTimestamp beginningOffsets(), endOffsets()offsetsForTimes() with CreateTime and LogAppendTime

Rejected Alternatives

Using seekToTimestamp() instead of

...

offsetsForTimes()

  1. Although in most cases users search offset by time to simply consume from that offset. But there are some cases that users just want to get the offsets for checking and not want to move the offsets. (e.g. people checkpointing offsets outside of Kafka)
  2. It is easy for the users to seek to the offset returned by offsetForTimeoffsetsForTimes()
  3. A separate offsetForTimeoffsetsForTimes() method allows user to query any topic partition without first get the partition assigned.