Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleoffsetForTime() method
/**
 * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the 
 * earliest offset whose timestamp is greater than or equals to the given timestamp in the corresponding partition.
 * 
 * This is a blocking call. The consumer does not have to be assigned the partitions.
 *
 * @param timestampsToSearch the mapping from partition to the timestamp to look up.
 * @return For each partition, returns the timestamp and offset of the first message with timestamp greater 
 *         than or equal to the target timestamp.
 */
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);

/**
 * Get the earliest available offsets for the given partitions.
 * 
 * @param partitions the partitions to get the earliest offsets.
 * @return The earliest available offsets for the given partitions
 */
Map<TopicPartition, Long> earliestOffsetsbeginningOffsets(Collection<TopicPartition> partitions);
 
/**
 * Get the latest offsets for the given partitions.
 * 
 * @param partitions the partitions to get the latest offsets.
 * @return The latest available offsets for the given partitions.
 */
Map<TopicPartition, Long> latestOffsetsendOffsets(Collection<TopicPartition> partitions);
 
public class OffsetAndTimestamp {
	private final long timestamp;
	private final long offset;
 
	public OffsetAndTimestamp(long offset, long timestamp) {
		this.timestamp = timestamp;
		this.offset = offset;
	}
 
	public long timestamp() { return timestamp; }
	public long offset() { return offset; }
}

...

The target timestamp have to be non-negative. Otherwise an IllegalArgumentException will be thrown. If no message has a timestamp that is greater than or equals to the target time, a null will be returned.

...

beginningOffsets() and

...

endOffsets()

earlistOffsets() and latestOffsetsendOffsets() return the first and last offset for the given partitions. 

...

Test the new consumer against broker with message format before and after 0.10.

Test earliestOffsetsbeginningOffsets(), latestOffsetsendOffsets()offsetsForTimes() with CreateTime and LogAppendTime

...