Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAccepted

Discussion threadhere

JIRA: KAFKA-4148

...

// ListOffsetResponse v1 
ListOffsetResponse => [TopicName [PartitionOffsets]] 
  PartitionOffsets => Partition ErrorCode Timestamp [Offset]
  Partition => int32 
  ErrorCode => int16 
  Timestamp => int64 
  Offset => int64

Add new Error Code 43 - UnsupportedForMessageFormat

This error code is added to indicate that the requested operation is not supported by the message format version.  In this KIP it means the timestamp search operation is not supported by the message format before 0.10.0 when ListOffsetRequest is v1. This error code could also be used in the future when message format evolves again. (e.g. KIP-82).

Add a new method to o.a.k.c.consumer.Consumer to allow user search offsets by timestamp.

Code Block
languagejava
titleoffsetForTime() method
/**
 * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the 
 * earliest offset whose timestamp is greater than or equals to the given timestamp in the corresponding partition.
 * 
 * This is a blocking call. The consumer does not have to be assigned the partitions.
 *
 * @param timestampsToSearch the mapping from partition to the timestamp to look up.
 * @return For each partition, returns the timestamp and offset of the first message with timestamp greater 
 *         than or equal to the target timestamp.
 */
Map<TopicPartition, TimestampOffset>OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);

/**
 * Get the earliest available offsets for the given partitions.
 * 
 * @param partitions the partitions to get the earliest offsets.
 * @return The earliest available offsets for the given partitions
 */
Map<TopicPartition, Long> earliestOffsetsbeginningOffsets(Set<TopicPartition>Collection<TopicPartition> partitions);
 
/**
 * Get the latest offsets for the given partitions.
 * 
 * @param partitions the partitions to get the latest offsets.
 * @return The latest available offsets for the given partitions.
 */
Map<TopicPartition, Long> latestOffsetsendOffsets(Set<TopicPartition>Collection<TopicPartition> partitions);
 
public class TimestampOffsetOffsetAndTimestamp {
	private final long timestamp;
	private final long offset;
 
	public TimestampOffsetOffsetAndTimestamp(long timestampoffset, long offsettimestamp) {
		this.timestamp = timestamp;
		this.offset = offset;
	}
 
	public long timestamp() { return timestamp; }
	public long offset() { return offset; }
}

...

If message format on the broker side is before 0.10, i.e. messages do not have timestamps. ListOffsetRequest v1 will only work if target time = -1 or -2. Any other target time will result in an InvalidRequestException UnsupportedForMessageFormat (error code 4243).

Implementation wise, we will migrate to o.a.k.common.requests.ListOffsetRequest class on the broker side.

Because Kafka request schema uses an Array to represent a Map, it is possible that users form a ListOffsetRequest that contains duplicate partitions with differen timestamps. In this case, the broker will return an InvalidRequestException(error code 42) for that partition in the ListOffsetResponse.

offsetsForTimes() method in Consumer

...

The target timestamp have to be non-negative. Otherwise an IllegalArgumentException will be thrown. If no message has a timestamp that is greater than or equals to the target time, a TimestampOffset(-1L, -1L) null will be returned.

...

beginningOffsets() and

...

endOffsets()

earlistOffsets() and latestOffsetsendOffsets() return the first and last offset for the given partitions. 

...

Test the new consumer against broker with message format before and after 0.10.

Test earliestOffsetsbeginningOffsets(), latestOffsetsendOffsets()offsetsForTimes() with CreateTime and LogAppendTime

...