Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

m

Table of Contents

Status

Current stateDraft

...

One more detail on stream is about task definition.

Look into the future: Transactional Support

The task will still be associated with partition number, but internally we mark it as a "Shared" task and put the key range into it. During rebalance, we will try to group shared tasks associating based on their partition and assign them onto the same stream instance, and the state store for each topic partition could allow concurrent access by shared tasks.

For example, if we have 2 partitions tp-0, tp-1 and 2 stream instances, with each stream instance configuring with 2 threads. In total, we have 4 consumers subscribing to them, and create four shared tasks: 

task-0-[0, N/2 - 1],
task-0-[N/2, N],
task-1-[0, N/2 - 1],
task-1-[N/2 , N],

Normally for non-shared tasks, we would create two local state stores state-store-0, state-store-1. During rebalance, we will target to make the following assignments:

M1: task-0-[0, N/2 - 1], task-0-[N/2, N]
M2: task-1-[0, N/2 - 1], task-1-[N/2, N]

The benefit of arranging such assignment is to reduce state store creation. If unfortunately we have an assignment such like:

M1: task-0-[0, N/2 - 1], task-0-[N/2, N]
M2: task-1-[0, N/2 - 1], task-1-[N/2, N]

We need to create 2 state stores on each machine, which in total is 4.

Another issue will be on the interactive query, as a specific record key may be assigned to any shared task. 

Look into the future: Transactional Support

Eventually IC semantic has to be compatible with transaction support. Despite the ongoing discussion of any change to the transaction semantics, we are providing a rough plan on integrating with current transaction model in this KIP as well. Eventually IC semantic has to be compatible with transaction support. Despite the ongoing discussion of any change to the transaction semantics, we are providing a rough plan on integrating with current transaction model in this KIP as well. The current transaction model uses transaction markers to specify whether all the records before it are ready to be revealed in downstream. So inherently, this is a design that obeys partition level ordering. To avoid over-complicating the transaction semantic, we will introduce a new delayed queue or purgatory for transactional IC semantic, where the commit shall be blocked until it could hit the stable offset. Take Kafka Streams as an example, if we have a chunk of data [40-49] with stable offset at 39, with two stream threads A and B are turning on EOS at the same time in the sharing mode:

...

The offset fetch request will also be augmented to incorporate the key hash range. Using a list instead of one hash range allows future extensibility. 

Code Block
FetchRequest => MaxWaitTime ReplicaId MinBytes IsolationLevel FetchSessionId FetchSessionEpoch [Topics] [RemovedTopics] KeyRanges GenerationId MemberId SessionTimeout
  MaxWaitTime => int32
  ReplicaId => int32
  MinBytes => int32
  IsolationLevel => int8
  FetchSessionId => int32
  FetchSessionEpoch => int32
  Topics => TopicName Partitions
    TopicName => String
    Partitions => [Partition FetchOffset StartOffset LeaderEpoch MaxBytes]
        Partition => int32
        CurrentLeaderEpoch => int32
        FetchOffset => int64
        StartOffset => int64
        MaxBytes => int32
  RemovedTopics => RemovedTopicName [RemovedPartition]
    RemovedTopicName => String
    RemovedPartition => int32
  KeyRanges => List<Tuple<int64, int64>> // NEW

...

Code Block
languagejava
titleConsumer.java
public ConsumerRebalanceListener {

	@Deprecated
	void onPartitionsRevoked(Collection<TopicPartition> partitions);

 	@Deprecated
    void onPartitionsAssigned(Collection<TopicPartition> partitions);

    @Deprecated
    default void onPartitionsLost(Collection<TopicPartition> partitions) {
        onPartitionsRevoked(partitions);
    }
	
   /**
    * Note that if a partition maps to an empty list, that means a full ownership of the partitionpartitions.
    */
	void onPartitionWithKeyRangesAssigned(Map<TopicPartition, List<Tuple<int64, int64>> partitionWithKeyRanges); // NEW

	void onPartitionWithKeyRangesRevoked(Map<TopicPartition, List<Tuple<int64, int64>> partitionWithKeyRanges); // NEW

	void onPartitionWithKeyRangesLost(Map<TopicPartition, List<Tuple<int64, int64>> partitionWithKeyRanges); // NEW
}

...