Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We will add a class DrainMonitor in SamzaContainer that will periodically check the coordinator-stream backed metadata store for DrainNotification. SystemConsumers periodically polls the consumers for new messages, updates the chooser with the new messages, polls the chooser to pick the next message to process in the RunLoop. The following changes will be made:

  • If a DrainNotification message is encountered by Drain, it sets the SamzaContainer in drain mode
  • SystemConsumers will stop polling for new messages from

...

  • source systems on every refresh. The refresh will still pick messages from intermediate stream systems. This marks the end of consumption of any new messages from sources. We still want to consume messages that are propagated downstream from source to intermediate steams.
  • SystemConsumers maintains an in-memory queue of unprocessed messages per SSP.
    • Upon drain, no new messages will be inserted in the source SSP queues as refresh of source consumers has stopped. There will be pending messages in the queues from previous refreshes.
    • It
  • On drain, it
    • will write Watermark control messages to all active source SSP queues (registered SSPs - intermediate SSPs - end-of-stream SSPs) to advance the watermark to infinity (Long.MaxValue).
  • It
    • Next, it will also append a drain control message to all active source SSP queues (registered SSPs - intermediate SSPs - end-of-stream SSPs) to denote that the SSPs are draining
  • .

Drain control messages (DrainMesssage) are special markers appended to the per-SSP in-memory queue of unprocessed messages in SystemConsumers. Its purpose is to indicate that the SSP is draining and the chooser will not return any more messages for that SSP. It is akin to other control messages, namely WatermarkMessage and  EndOfStreamMessage. The current deployment id is also added to the Drain control message to differentiate the drain markers between re-deploys. Drain message will only be processed by the run loop if its deployment id matches the current deployment id to prevent accidental prop.

...