...
Code Block |
---|
given: T list of tasks to be assigned (includes t.offsets, the total number of offsets in task t) I list of instances to assign tasks to (includes i[t].lag, the reported lag of instance i on task t) Assignment StatefulActiveTaskAssignments: Map<instance, List<Task>> StandbyTaskAssignments: Map<instance, List<task>> StatelessTaskAssignments: Map<instance, List<task>> balance_factor (as defined) acceptable_recovery_lag (as defined) num_standbys (as defined) StatefulTasksToRankedCandidates: Map<task, Map<rank, instance>> := rank(T, I, acceptable_recovery_lag) ProposedStatefulActiveTaskAssignments: Map<instance, List<task>> := proposeBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor) Movement: <task, source instance, destination instance> ProposedMovements: List<Movement> := getMovements(StatefulActiveTaskAssignments, ProposedStatefulActiveTaskAssignments) AvailableMovements: List<Movement> := [] for movement=(task, _, destination instance) in ProposedMovements: if StatefulTasksToRankedCandidates[task][0] contains destination instance: add movement to AvailableMovements // if there are any movements that are now available, given the current active task assignment, then just rebalance return AvailableMovements > 0 |
Rebalance Metadata
tbd
Heartbeat Metadata
The Consumer rebalance protocol has a pluggable extension point, the PartitionAssignor
interface, along with a black-box SubscriptionInfo
field. These mechanisms can be used in conjunction to encode extra information from the group members when they join the group, and then make a custom assignment decision. Finally, the PartitionAssignor
assignment response contains another black-box field to encode extra information from the leader to the members.
Today, Streams uses the StreamsPartitionAssignor
to implement its custom "balanced stickiness" assignment strategy, which is supported by the metadata fields in the subscription and assignment protocols.
More specifically on subscription:
Code Block |
---|
KafkaConsumer:
Subscription => TopicList SubscriptionInfo
TopicList => List<String>
SubscriptionInfo => Bytes
------------------
StreamsPartitionAssignor:
SubscriptionInfo (encoded in version 4) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint
VersionId => Int32
LatestSupportVersionId => Int32
ClientUUID => 128bit
PrevTasks => Set<TaskId>
StandbyTasks => Set<TaskId>
EndPoint => HostInfo |
And on assignment:
Code Block |
---|
KafkaConsumer:
Assignment = AssignedPartitions AssignmentInfo
AssignedPartitions => List<TopicPartition>
AssignmentInfo => Bytes
------------------
StreamsPartitionAssignor:
AssignmentInfo (encoded in version 4) => VersionId, LatestSupportedVersionId, ActiveTasks, StandbyTasks, PartitionsByHost, ErrorCode
VersionId => Int32
LatestSupportVersionId => Int32
ActiveTasks => List<TaskId>
StandbyTasks => Map<TaskId, Set<TopicPartition>>
PartitionsByHost => Map<HostInfo, Set<TopicPartition>>
ErrorCode => Int32 |
To support the proposed algorithms, we're proposing a new, version 5, format for SubsriptionInfo
:
Code Block |
---|
SubscriptionInfo (encoded in version 5) => VersionId LatestSupportVersionId ClientUUID StoreLags EndPoint
VersionId => Int32
LatestSupportVersionId => Int32
ClientUUID => 128bit
StoreLags => Map<StoreId, Int32> // new change
EndPoint => HostInfo |
The new field, StoreLags
would encode the lag for every store that the instance hosts. The leader can use this information, in conjunction with its knowledge of the topology to determine the task lag, as required by the proposed algorithms. By sharing the lags at the store level, though, we also gain other useful bits of information that can be incorporated now or in the future. The number of state stores assigned to each instance, for example, or reasoning about lag differently depending on whether it's distributed over more or fewer stores.
We do not need any modifications to AssignmentInfo
at this time.
Heartbeat Metadata
In addition to tbd
Streams Rebalance Metadata: Remember the PrevTasks
...