...
We do not need any modifications to AssignmentInfo
at this time.
Example Scenarios
Scaling Out
Stateful (active) Tasks := {T1, T2, T3}
Standby Tasks := {S1, S2, S3}
Instances := {I1, I2}
Initial State
I1 | I2 | |
---|---|---|
active | T1, T3 | T2 |
standby | T2 | T1, T3 |
-- New instance (I3) joins --
First Rebalance (catching-up)
I1 | I2 | I3 | |
---|---|---|---|
active | T1, T3 | T2 | |
standby | T2 | T1, T3 | T1, T3 |
-- I3 finishes catching up, another rebalance is triggered --
Second Rebalance (stable state)
I1 | I2 | I3 | |
---|---|---|---|
active | T1 | T2 | T3 |
standby | T2 | T3 | T1 |
Note that following KIP-429, the second rebalance will technically itself be composed of two rebalance since we are revoking the active task T3 from I1.
Scaling In (Pt. 1 – standbys in sync)
Stateful (active) Tasks := {T1, T2, T3, T4}
Standby Tasks := {S1, S2, S3, S4}
Instances := {I1, I2, I3}
The standby tasks in this scenario are assumed to be in sync with the active tasks, that is, caught up and within acceptable_recovery_lag.
Initial State
I1 | I2 | I3 | |
---|---|---|---|
active | T1, T4 | T2 | T3 |
standby | T3 | T1, T4 | T2 |
-- One instance (I1) is brought down --
First Rebalance
I2 | I3 | |
---|---|---|
active | T1, T4 | T2, T3 |
standby | T2, T3 | T1, T4 |
In this case, no subsequent rebalance is necessary (note that as of KIP-429 the first rebalance technically consists of two rebalances as an active task is being revoked). Processing resumes immediately as the standby tasks are ready to take over as active tasks.
Scaling In (Pt. 2 – standbys lag nontrivially)
Stateful (active) Tasks := {T1, T2, T3, T4}
Standby Tasks := {S1, S2, S3, S4}
Instances := {I1, I2, I3}
In this scenario, the standby tasks are lagging the active tasks by some nontrivial amount, that is, by more than the acceptable_recovery_lag.
Initial State
I1 | I2 | I3 | |
---|---|---|---|
active | T1, T4 | T2 | T3 |
standby | T3 | T1, T4 | T2 |
-- One instance (I1) is brought down --
First Rebalance (catching-up)
I2 | I3 | |
---|---|---|
active | T1, T2, T4 | T3 |
standby | T3 | T1, T2, T4 |
-- I3 finishes restoring T4 --
Second Rebalance (stable state)
I2 | I3 | |
---|---|---|
active | T1, T2 | T3, T4 |
standby | T3, T4 | T1, T2 |
Unlike the previous example, I3 cannot immediately take over T2 as an active task since it is not sufficiently caught up. In this case the first rebalance would produce an imbalanced assignment where I2 bears the major processing load while I3 focuses on restoring the task (T4) it will eventually be assigned as active.
Related Work
Note that the main concern of this KIP is how to allocate and re-allocate sharded stateful tasks, of which the state itself is the difficult part. Thus, although other stream processing systems are of prime interest, we can also look to the balancing algorithms employed by distributed databases, as long as those dbs follow the Primary/Replica model. This is advantageous both for the diversity of perspective it lends, but also because some of these database systems are more mature than any modern Stream Processing system.
...