Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Following the consumption of startpoints, metadata store will be polluted with SSP-only keyed startpoints and per-task-ACKs. Current implementation of the metadata store will bootstrap all the startpoints and ACKs until they are cleaned up.
  • Task count may change during runtime due to the increase in the input stream partition count. Therefore, new tasks will pick up the Startpoint unless all tasks prior to the increase in task count have written their ACKs and the cleanup process ran prior to the task-to-partition mapping. Startpoints should only apply to the state of the job upon startup and not to any changes during runtime. 

Additional Notes

Following is the workflow for the Intent/ACK approach:

1. The startpoint is written to the startpoint store. 
2. The SamzaContainer's read the startpoint and acknowledge that they've acted on it. 
3. After sufficient number of acks are received, an external agent purge the startpoint. External agent can be either one of the running SamzaContainer of the job or JobCoordinator of the job or a external daemon process.

In above workflow, an external agent will wait for the expected number of ACKs to be published to startpoint-store, and then purge the startpoint. The expected number of ACK's is not a static number. 

The container can acknowledge the startpoint at either one of the following two levels of granularity:

1. Container-level: Currently, compared to YARN with static container count(defined by job.container.count), the number of containers in standalone is dynamic. Processors can join and leave the group at any point in time. For standalone, an external agent cannot determine if it had received sufficient number of ACK's with certainty since number of containers is a moving number. 

 Standalone :

Let's consider the following scenario for standalone application. 

   A. Four processors are part of standalone application when the external agent watches for number of active processors in standalone application.
   B. External agent expects four ACK's to be written to the store before it can purge the startpoint. 
   C. Before any processor acks the startpoint, due to hardware failure, number of active processors gets reduced to three and a re-balance happens. 
   D. After re-balance, three live processors acks their corresponding startpoint to the store and are running fine. External agent cannot clear the start-point, since there're only three ACK's when compared to expected four. 

   Unless we change the coordination protocol in standalone to accommodate start-point, I'm not sure if we can ACK at this level to purge startpoints.

 YARN :

There has been interesting recent developments, where we're planning to develop auto-scaling solution for YARN. In the immediate future, the number of containers in YARN will be not fixed and the above scenario described for standalone will apply for YARN as well. 

2. Task-level: The number of tasks can change dynamically at run-time either due to increase in partition count of any of the input topics of the job or a new topic has matched the user-defined input topic-regex of the job. The above scenario described for standalone is still applicable here.


Rejected Alternatives

Previous explored solutions involved modifying the checkpoint offsets directly. Operationally, the proposed solution provides more safety because checkpoints are used for fault-tolerance and should not allow user intervention to prevent human error. Having the ability to set starting offsets out-of-band provides the safety layer.