Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
given:
  T list of tasks to be assigned
    (includes t.offsets, the total number of offsets in task t)
  I list of instances to assign tasks to
    (includes i[t].lag, the reported lag of instance i on task t)
  Assignment
    StatefulActiveTaskAssignments: Map<instance, List<Task>>
    StandbyTaskAssignments: Map<instance, List<task>>
    StatelessTaskAssignments: Map<instance, List<task>>
  balance_factor (as defined)
  acceptable_recovery_lag (as defined)
  num_standbys (as defined)

StatefulTasksToRankedCandidates: Map<task, Map<rank, instance>> := rank(T, I, acceptable_recovery_lag)

ProposedStatefulActiveTaskAssignments: Map<instance, List<task>> := proposeBalancedAssignment(StatefulTasksToRankedCandidates, balance_factor)
Movement: <task, source instance, destination instance>
ProposedMovements: List<Movement> := getMovements(StatefulActiveTaskAssignments, ProposedStatefulActiveTaskAssignments)

AvailableMovements: List<Movement> := []

for movement=(task, _, destination instance) in ProposedMovements:
  if StatefulTasksToRankedCandidates[task][0] contains destination instance:
    add movement to AvailableMovements

// if there are any movements that are now available, given the current active task assignment, then just rebalance
return AvailableMovements > 0

Rebalance Metadata

tbd

Heartbeat Metadata

The Consumer rebalance protocol has a pluggable extension point, the PartitionAssignor interface, along with a black-box SubscriptionInfo field. These mechanisms can be used in conjunction to encode extra information from the group members when they join the group, and then make a custom assignment decision. Finally, the PartitionAssignor assignment response contains another black-box field to encode extra information from the leader to the members.

Today, Streams uses the  StreamsPartitionAssignor to implement its custom "balanced stickiness" assignment strategy, which is supported by the metadata fields in the subscription and assignment protocols.

More specifically on subscription:

Code Block
KafkaConsumer:


Subscription => TopicList SubscriptionInfo
   TopicList               => List<String>
   SubscriptionInfo        => Bytes

------------------


StreamsPartitionAssignor:

SubscriptionInfo (encoded in version 4) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint

   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ClientUUID              => 128bit
   PrevTasks               => Set<TaskId>
   StandbyTasks            => Set<TaskId>
   EndPoint                => HostInfo


And on assignment: 

Code Block
KafkaConsumer:

Assignment = AssignedPartitions AssignmentInfo
   AssignedPartitions      => List<TopicPartition>
   AssignmentInfo          => Bytes

------------------

StreamsPartitionAssignor:

AssignmentInfo (encoded in version 4) => VersionId, LatestSupportedVersionId, ActiveTasks, StandbyTasks, PartitionsByHost, ErrorCode
   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ActiveTasks             => List<TaskId>
   StandbyTasks            => Map<TaskId, Set<TopicPartition>>
   PartitionsByHost        => Map<HostInfo, Set<TopicPartition>>
   ErrorCode               => Int32


To support the proposed algorithms, we're proposing a new, version 5, format for SubsriptionInfo:

Code Block
SubscriptionInfo (encoded in version 5) => VersionId LatestSupportVersionId ClientUUID StoreLags EndPoint

   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ClientUUID              => 128bit
   StoreLags               => Map<StoreId, Int32>    // new change
   EndPoint                => HostInfo

The new field, StoreLags would encode the lag for every store that the instance hosts. The leader can use this information, in conjunction with its knowledge of the topology to determine the task lag, as required by the proposed algorithms. By sharing the lags at the store level, though, we also gain other useful bits of information that can be incorporated now or in the future. The number of state stores assigned to each instance, for example, or reasoning about lag differently depending on whether it's distributed over more or fewer stores.

We do not need any modifications to AssignmentInfo at this time.

Heartbeat Metadata

In addition to tbd

Streams Rebalance Metadata: Remember the PrevTasks

...