Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

PlantUML Render Macro
border1
formatPNG
exportNamepartition-reconciliation-step-2.png
titlePartition Reconciliation Step 2
@startuml
hide footbox
skinparam maxMessageSize 70

actor User as U
participant ProtoAsyncConsumer as PAC
participant "Revoke\nPartitions\nEvent" as RPE
participant "Partitions\nRevoked\nEvent" as PRE
participant "Consumer\nRebalance\nListener" as CRL
queue "Application\nEvent\nQueue" as AEQ
queue "Backend\nEvent\nQueue" as BEQ

U -> PAC: poll()
activate PAC
PAC -> BEQ: poll()
activate BEQ
return

PAC -> PAC: process background events
...

PAC -> RPE: partitions()
activate RPE
return

PAC -> CRL: partitionsRevoked()
activate CRL
return

create PRE
PAC -> PRE : new
PAC -> AEQ : add()
activate AEQ
return

return
@enduml

...


Rebalance State Machine


PlantUML Render Macro
formatPNG

@startuml
hide empty description

[*]

-->

Stable
Stable : this is a string
Stable : this is another string
Stable ---> Revoking
Stable ---> Assigning
state Revoking {
R1: entry point into revoking partitions
R2: if we have a callback,\nwe need to invoke it
R3: after we've made local changes,\nawait heartbeat
[*] --> R1
R1 -> R2
R2 -> R3
R1 -> R3
R3 ---> Stable
R3 ---> Assigning
}
state Assigning {
A1: entry point into assigning partitions
A2: if we have a callback,\nwe need to invoke it
A3: after we've made local changes,\nawait heartbeat
[*] --> A1
A1 -> A2
A1 -> A3
A2 -> A3
A3 -> Stable
}
Stable --> Closing
Revoking --> Closing
Assigning --> Closing
Closing --> [*]
@enduml

...

Updating

...

Revoking via Callback

...

If there are partitions that need to be revoked and there is a ConsumerRebalanceListener configured for the Consumer, the MemberAssignmentReconciler will enqueue a RevokePartitionsEvent containing the set of newly revoked partitions onto the background event queue.

...

Updating

...

Assigning

...

If there are no partitions to revoke or there is no ConsumerRebalanceListener, skip to step ??????

...

NOT_IN_GROUP
NOT_IN_GROUP ---> UNJOINED : Consumer.subscribe()

UNJOINED ---> UNJOINED : Heartbeat on retries
UNJOINED ---> STABLE : Heartbeat, epoch = 0
UNJOINED ---> FATAL

STABLE ---> RECONCILING : new assignment received\ntrigger callbacks
STABLE ---> RECONCILING : Consumer.unsubscribe()
STABLE ---> RELEASING_ASSIGNMENT : on HB fencing error, reset epoch,\ntrigger callbacks
STABLE ---> FATAL
STABLE ---> STABLE : Heartbeat

RECONCILING ---> RECONCILING : Heartbeat
RECONCILING ---> RECONCILIATION_COMPLETE : callback completed
RECONCILIATION_COMPLETE ---> STABLE : Send heartbeat w/ full assignment,\nreset interval timer to\nheartbeat.interval.ms
RECONCILIATION_COMPLETE ---> STABLE : Send heartbeat w/ full assignment,\nreset interval timer to\nheartbeat.interval.ms
RECONCILIATION_COMPLETE ---> UNJOINED : unsubscribe()

RELEASING_ASSIGNMENT ---> RELEASING_ASSIGNMENT_COMPLETE : 'onLost' callback completed
RELEASING_ASSIGNMENT_COMPLETE ---> UNJOINED : Send out-of-interval HB
RELEASING_ASSIGNMENT ---> FATAL

FATAL ---> [*]

@enduml



David's notes


Gliffy Diagram
macroId7458e62c-1e1e-4d7e-b66f-7c6d61842cd0
displayNameKIP-848 Client State Machine
nameKIP-848 Client State Machine
pagePin6

States:

  • UNSUBSCRIBE: The consumer is not subscribed to any topics nor regex therefore it is not part of a consumer group.
  • JOINING: The consumer has subscribed with topic names or a regex. Consumer send an HB request to join the group with epoch 0 and transitions to Stable.
  • STABLE: While in this state, has nothing to do besides heartbeatting to remain in the group.
  • RECONCILE_ASSIGNMENT: Whenever the consumer received a non-null assignment from the group coordinator, it transitions to this state and reconciles its assignment. It should revoke unnecessary partitions and assign the new ones. This also commits offsets and triggers the rebalance callbacks. When the reconciliation completes, it transitions to ACK_ASSIGNMENT.
  • ACK_ASSIGNMENT: This signals to the HB manager than an HB request must be sent in the next run of the event loop event the HB internal has not expired. It transitions to STABLE when that signal is given.
  • UNSUBSCRIBING: When the consumer calls unsubscribe or close (this can happen anytime), it transitions to this state, cancels any ongoing reconciliation (how to?), revoke partitions/commit offsets and send the last HB to leave the group. When done, it transitions to UNSUBSCRIBE.
  • FENCED: When the group coordinator fences the member (this can happen anytime), it transition to this state, cancels any ongoing reconciliation (how to?), resets the member epoch and invokes onLost for all partitions. When done, it transitions to JOINING to rejoin the group.
  • FATAL: The consumer enters this state whenever a fatal errors is encountered. This is not recoverable.

Notes

  • When the subscriptions are changed, should we send the next HB immediately?
  • Should we transition from FATAL to UNSUBSCRIBE when the subscriptions are changed? Let's imagine that the user subscribes with an invalid regex. In this case, the consumer transition to FATAL as this is not recoverable. However, the user may react to the exception and change the subscriptions. We may need to give it another try if we have new subscriptions.

...