m
Table of Contents |
---|
Status
Current state: Draft
...
One more detail on stream is about task definition.
Look into the future: Transactional Support
The task will still be associated with partition number, but internally we mark it as a "Shared" task and put the key range into it. During rebalance, we will try to group shared tasks associating based on their partition and assign them onto the same stream instance, and the state store for each topic partition could allow concurrent access by shared tasks.
For example, if we have 2 partitions tp-0, tp-1 and 2 stream instances, with each stream instance configuring with 2 threads. In total, we have 4 consumers subscribing to them, and create four shared tasks:
task-0-[0, N/2 - 1],
task-0-[N/2, N],
task-1-[0, N/2 - 1],
task-1-[N/2 , N],
Normally for non-shared tasks, we would create two local state stores state-store-0, state-store-1. During rebalance, we will target to make the following assignments:
M1: task-0-[0, N/2 - 1], task-0-[N/2, N]
M2: task-1-[0, N/2 - 1], task-1-[N/2, N]
The benefit of arranging such assignment is to reduce state store creation. If unfortunately we have an assignment such like:
M1: task-0-[0, N/2 - 1], task-0-[N/2, N]
M2: task-1-[0, N/2 - 1], task-1-[N/2, N]
We need to create 2 state stores on each machine, which in total is 4.
Another issue will be on the interactive query, as a specific record key may be assigned to any shared task.
Look into the future: Transactional Support
Eventually IC semantic has to be compatible with transaction support. Despite the ongoing discussion of any change to the transaction semantics, we are providing a rough plan on integrating with current transaction model in this KIP as well. Eventually IC semantic has to be compatible with transaction support. Despite the ongoing discussion of any change to the transaction semantics, we are providing a rough plan on integrating with current transaction model in this KIP as well. The current transaction model uses transaction markers to specify whether all the records before it are ready to be revealed in downstream. So inherently, this is a design that obeys partition level ordering. To avoid over-complicating the transaction semantic, we will introduce a new delayed queue or purgatory for transactional IC semantic, where the commit shall be blocked until it could hit the stable offset. Take Kafka Streams as an example, if we have a chunk of data [40-49] with stable offset at 39, with two stream threads A and B are turning on EOS at the same time in the sharing mode:
...
The offset fetch request will also be augmented to incorporate the key hash range. Using a list instead of one hash range allows future extensibility.
Code Block |
---|
FetchRequest => MaxWaitTime ReplicaId MinBytes IsolationLevel FetchSessionId FetchSessionEpoch [Topics] [RemovedTopics] KeyRanges GenerationId MemberId SessionTimeout MaxWaitTime => int32 ReplicaId => int32 MinBytes => int32 IsolationLevel => int8 FetchSessionId => int32 FetchSessionEpoch => int32 Topics => TopicName Partitions TopicName => String Partitions => [Partition FetchOffset StartOffset LeaderEpoch MaxBytes] Partition => int32 CurrentLeaderEpoch => int32 FetchOffset => int64 StartOffset => int64 MaxBytes => int32 RemovedTopics => RemovedTopicName [RemovedPartition] RemovedTopicName => String RemovedPartition => int32 KeyRanges => List<Tuple<int64, int64>> // NEW |
...
Code Block | ||||
---|---|---|---|---|
| ||||
public ConsumerRebalanceListener { @Deprecated void onPartitionsRevoked(Collection<TopicPartition> partitions); @Deprecated void onPartitionsAssigned(Collection<TopicPartition> partitions); @Deprecated default void onPartitionsLost(Collection<TopicPartition> partitions) { onPartitionsRevoked(partitions); } /** * Note that if a partition maps to an empty list, that means a full ownership of the partitionpartitions. */ void onPartitionWithKeyRangesAssigned(Map<TopicPartition, List<Tuple<int64, int64>> partitionWithKeyRanges); // NEW void onPartitionWithKeyRangesRevoked(Map<TopicPartition, List<Tuple<int64, int64>> partitionWithKeyRanges); // NEW void onPartitionWithKeyRangesLost(Map<TopicPartition, List<Tuple<int64, int64>> partitionWithKeyRanges); // NEW } |
...