Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We do not need any modifications to AssignmentInfo at this time.

Example Scenarios

Scaling Out

Stateful (active) Tasks := {T1, T2, T3}
Standby Tasks := {S1, S2, S3}
Instances := {I1, I2}

Initial State


I1I2
active

T1, T3

T2
standbyT2T1, T3

-- New instance (I3) joins --

First Rebalance (catching-up)


I1I2I3
active

T1, T3

T2
standbyT2T1, T3T1, T3

-- I3 finishes catching up, another rebalance is triggered --

Second Rebalance (stable state)


I1I2I3
active

T1

T2T3
standbyT2T3T1

Note that following KIP-429, the second rebalance will technically itself be composed of two rebalance since we are revoking the active task T3 from I1.

Scaling In (Pt. 1 – standbys in sync)

Stateful (active) Tasks := {T1, T2, T3, T4}
Standby Tasks := {S1, S2, S3, S4} 
Instances := {I1, I2, I3}

The standby tasks in this scenario are assumed to be in sync with the active tasks, that is, caught up and within acceptable_recovery_lag.

Initial State


I1I2I3
active

T1, T4

T2T3
standbyT3T1, T4T2

-- One instance (I1) is brought down --

First Rebalance 


I2I3
active

T1, T4

T2, T3
standbyT2, T3T1, T4


In this case, no subsequent rebalance is necessary (note that as of KIP-429 the first rebalance technically consists of two rebalances as an active task is being revoked). Processing resumes immediately as the standby tasks are ready to take over as active tasks.

Scaling In (Pt. 2 – standbys lag nontrivially)

Stateful (active) Tasks := {T1, T2, T3, T4}
Standby Tasks := {S1, S2, S3, S4} 
Instances := {I1, I2, I3}

In this scenario, the standby tasks are lagging the active tasks by some nontrivial amount, that is, by more than the acceptable_recovery_lag.

Initial State


I1I2I3
active

T1, T4

T2T3
standbyT3T1, T4T2

-- One instance (I1) is brought down --

First Rebalance (catching-up)


I2I3
active

T1, T2, T4

T3
standbyT3T1, T2, T4

-- I3 finishes restoring T4 --

Second Rebalance (stable state)


I2I3
active

T1, T2

T3, T4
standbyT3, T4T1, T2

Unlike the previous example, I3 cannot immediately take over T2 as an active task since it is not sufficiently caught up. In this case the first rebalance would produce an imbalanced assignment where I2 bears the major processing load while I3 focuses on restoring the task (T4) it will eventually be assigned as active.

Related Work

Note that the main concern of this KIP is how to allocate and re-allocate sharded stateful tasks, of which the state itself is the difficult part. Thus, although other stream processing systems are of prime interest, we can also look to the balancing algorithms employed by distributed databases, as long as those dbs follow the Primary/Replica model. This is advantageous both for the diversity of perspective it lends, but also because some of these database systems are more mature than any modern Stream Processing system.

...