Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Another issue will be on the interactive query, as a specific record key may be assigned to any shared task.  So StreamsMetadata needs 

Look into the future: Transactional Support

...

Code Block
languagejava
titleConsumer.java
public ConsumerRebalanceListener {

	@Deprecated
	void onPartitionsRevoked(Collection<TopicPartition> partitions);

 	@Deprecated
    void onPartitionsAssigned(Collection<TopicPartition> partitions);

    @Deprecated
    default void onPartitionsLost(Collection<TopicPartition> partitions) {
        onPartitionsRevoked(partitions);
    }
	
   /**
    * Note that if a partition maps to an empty list, that means a full ownership of the partitions.
    */
	void onPartitionWithKeyRangesAssigned(Map<TopicPartition, List<Tuple<int64, int64>> partitionWithKeyRanges); // NEW

	void onPartitionWithKeyRangesRevoked(Map<TopicPartition, List<Tuple<int64, int64>> partitionWithKeyRanges); // NEW

	void onPartitionWithKeyRangesLost(Map<TopicPartition, List<Tuple<int64, int64>> partitionWithKeyRanges); // NEW
}

For better visibility, we shall include the key range information in StreamsMetadata:

Code Block
languagejava
titleStreamsMetadata.java
public class OffsetAndMetadata {

  public HostInfo hostInfo();
	
  public Set<String> stateStoreNames();

  @Deprecated
  public Set<TopicPartition> topicPartitions();

  public Map<TopicPartition, List<Tuple<Long, Long>>> topicPartitionsWithKeyRange();

  public String host();

  public int port();
}


For better visibility, we shall include the range offsets in the ListConsumerGroupOffsetsResult:

...