1. Introduction

ActivationConsumer is a component which peeks activation messages from Kafka as fast as possible by using EagerMessageFeed, and forward it to a MemoryQueue.
As the MemoryQueue will cache all messages in a queue, so this component will not consider back-pressure, it will forward message once it got it.

2. Architecture Diagram

The ActivationConsumer is a finite state machine, which have 3 states: Running, Updating and Orphaned

  • it will start with Running, and started to pull messages from Kafka and forward them
  • when a different revision message is got, it will goto Updating state, and send a UpdateMemoryQueue to the QueueManager
  • when QueueManager updated the MemoryQueue, it will send back a NewMemoryQueue to ActivationConsumer, then the ActivationConsumer will back to Running
  • if QueueManager failed to update the MemoryQueue, it will send a FailedCreateNewMemoryQueue to ActivationConsumer, then the ActivationConsumer will go to Orphaned, now there is no MemoryQueue exist for this consumer
  • while in the Orphaned state, and here comes a new ActivationMessage, as there is no MemoryQueue exist, it will send UpdateMemoryQueue to QueueManager to create a new one, and go to Updating state
  • when the related MemoryQueue is time out and destroyed, the QueueManager will send a StopConsumer to ActivationConsumer and it will be stopped

3. Design consideration

Below is its definition:

/**
* Consumer which peek activation messages from Kafka and forward them to a MemoryQueue.
*
* @param action the action name
* @param revision revision of the action
* @param memoryQueue the memoryQueue it should forward message to
* @param queueManager the QueueManager
* @param consumer the kafka consumer
*/
class ActivationConsumer(
action: FullyQualifiedEntityName,
revision: DocRevision,
memoryQueue: ActorRef,
queueManager: ActorRef,
consumer: MessageConsumer)(implicit actorSystem: ActorSystem, executionContext: ExecutionContext, logging: Logging)
extends FSM[ActivationConsumer.ConsumerState, ActivationConsumer.ConsumerData]
with Stash

the revision and memoryQueue is the initial value for the ActivationConsumer, when it update the memoryQueue, it will save new revision and memoryQueue into the StateData

4. Exception / Failure cases

  • there maybe a chance that the MemoryQueue is already destroyed when send message to it, then DeadLetter exception will be raised and some ActivationMessages won't be processed, maybe we need a handshake protocol for stopping the MemoryQueue?
  • No labels