Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Table of Contents

Status

Current state:  [Under Discussion] Accepted

Vote thread: here

Discussion thread: TBD here

JIRA:  

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
maximumIssues20
jqlQuerykey = KAFKA-8019 or key = KAFKA-7149 or key = KAFKA-6145 or key = KAFKA-4696
serverId5aa69414-a9e9-3523-82ec-879b028fb15b

...

A note on the "moving" tasks. These are very similar to standby tasks, but we propose to give them a different name to make their role clear. This way, it's clear that they don't count against the "num.standby.replicas" config, and also that they represent an ephemeral increase in cluster storage requirements.

Parameters

  • balance_factor: A scalar integer value representing the target difference in number of tasks assigned to the node with the most tasks vs. the node with the least tasks. Defaults to 1. Must be at least 1.
  • acceptable_recovery_lag*: A scalar integer value indicating a task lag (number of offsets to catch up) that is acceptable for immediate assignment. Defaults to 10,000 (should be well under a minute and typically a few seconds, depending on workload). Must be at least 0.
  • num_standbys: A scalar integer indicating the number of hot-standby task replicas to maintain in addition to the active processing tasks. Defaults to 0. Must be at least 0.
  • probing_rebalance_interval_ms*:  A time interval representing the minimum amount of time (in milliseconds) that the leader should wait before triggering a "probing rebalance", assuming there is no intervening natural rebalance. Note that probing rebalances are only triggered if the current assignment is not balanced within "balance_factor". Assuming KIP-429 and KAFKA-8421, default would be  Default is 10 minutes (because there would be effectively no cost for no-op rebalances); without that prior work, default would be 1 day. Must be at least 1 minute.
  • max_taskwarmup_migrationsreplicas*: A scalar integer representing the maximum number of extra  replicas to assign for the purpose of moving a task to an instance that has neither the active nor any standby replicas of that task assigned. Used to throttle how much extra broker traffic and cluster state would be used for moving tasks. Defaults to 2. Minimum is 1.

* new config

Probing Rebalances

As of this KIP, an assignment may result in an unbalanced distribution of tasks in favor of returning to processing as soon as possible, while under-loaded nodes are assigned standby tasks to warm up. Once the under-loaded instances are warm, a next rebalance would be able to produce a more balanced assignment. This proposal creates a new requirement: the ability to trigger rebalances in response to conditions other than cluster or topic changes.

...

  • instance balance: The instance with the most tasks has no more than "balance_factor" more tasks 1 more task than the instance with the least tasks.
  • subtopology balance: The subtopology with the most instances executing its tasks has no more than "balance_factor" more instances 1 more instance executing it than the subtopology with the least instances executing it. I.e.: the subtopology is partitioned. Each partition of a subtopology is a "task", ideally, we want to spread the tasks for a subtopology evenly over the cluster, since we observe that workload over the partitions of a subtopology is typically even, whereas workload over different subtopologies may vary dramatically.

...

Code Block
KafkaConsumer:


Subscription => TopicList SubscriptionInfo
   TopicList               => List<String>
   SubscriptionInfo        => Bytes

------------------


StreamsPartitionAssignor:

SubscriptionInfo (encoded in version 46) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint

   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ClientUUID              => 128bit
   PrevTasks               => Set<TaskId>
   StandbyTasks            => Set<TaskId>
   EndPoint                => HostInfo

...

To support the proposed algorithms, we're proposing a new, version 57, format for SubsriptionInfo:

Code Block
SubscriptionInfo (encoded in version 5) => VersionId LatestSupportVersionId ClientUUID TaskLags EndPoint

   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ClientUUID              => 128bit
   TaskEndPoint Lags               => Map<TaskId,HostInfo
 Int32>  Task Lags // new change
   EndPoint         => Map<TaskId, Int64>    // =>new HostInfochange

The new field, TaskLags would encode the lag for every store that the instance hosts, summed to the task level. This subsumes both PrevTasks and StandbyTasks

...