Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Implementation and Test Plan

LocalApplicationRunner:

The following pseudocode shows the run method of LocalApplicationRunner:

 

...

Code Block
LocalApplicationRunner.run()

...

 

...

 {
  
  StreamGraph streamGraph = createStreamGraph();

...


  ExecutionPlan plan = executionPlanner.plan(streamGraph);

...



 If (plan has intermediate streams)

...

 {  
    startLeaderElection(() -> onBecomeLeader()

...

 {
     streamManager.createStreams();

...


     releaseLatch();

...

   })

 

...


    })
  
    waitForLatch(); // blocked until the latch is reached

...

}

 List<LocalJobRunner> jobs;

...


 }

  List<LocalJobRunner> jobs;
  plan.getJobConfigs().forEach() { jobConfig ->

...


     LocalJobRunner jobRunner = new LocalJobRunner();

...


     jobRunner.start(jobConfig);

...


     jobs.add(jobRunner);

...

 }

 

...


  }


  jobs.forEach(job ->

...

    wait for job complete;

 })

}

 {
     wait for job complete;
  })
}


 

 

To provide a unified method to run a user program on a physical host, user main() method will be the entrance point to start an actual JVM process on a target host to run the Samza processor.

 

How this works in standalone deployment

  • Configure app.class = <user-app-name>

  • Deploy the application to standalone hosts.

  • Run run-local-app.sh on each node to start/stop the local application.

...

RemoteApplicationRunner:

The following pseudocode shows the start of RemoteApplicationRunner:

 

...

Code Block
RemoteApplicationRunner.start()

...

 {
  StreamGraph streamGraph = createStreamGraph();

...


  ExecutionPlan plan = executionPlanner.plan(streamGraph);

...



  if (plan has intermediate streams)

...

 {  
     streamManager.createStreams();

...

 }

...


  }

  plan.getJobConfigs().forEach() { jobConfig ->

...


     RemoteJobRunner jobRunner = new RemoteJobRunner();

...


     jobRunner.start(jobConfig);

...

 }

}


  }
}

 

 

RemoteApplicationRunner.stop() is similar by replacing JobRunner.run() with JobRunner.stop() 

How this works in YARN deployment

  • Configure app.class = <user-app-name> and task.execute = run-local-app.sh

  • Deploy the application to single deployment gateway (RM in YARN).

  • Run run-app.sh on the deployment gateway to start the whole application. Run stop-app.sh on the same node to stop it.

  • When the cluster executor actually launches the Samza container processes, it runs run-local-app.sh on the target host, exactly like in the standalone deployment.

 

Note: that the last step to start the JVM process in YARN is exactly the same as the last step in the standalone environment. Hence, it provides the exact same a unified local runtime environment process between YARN and standalone environments. That makes it much easier to test and debug the actual runtime process in the local host for cluster deployment as well.


Compatibility, Deprecation, and Migration Plan

 

Rejected Alternatives