...
Discussion thread: <link to mailing list DISCUSS thread>
Released:
Problem
Samza Yarn follows a multi-stage deployment model, where Job Runner, which runs on the submission host, reads configuration, performs planning and persist config in the coordinator stream before submitting the job to Yarn cluster. In Yarn, Application Master (AM) reads config from coordinator stream before spinning up containers to execute. Split of responsibility between job runner and AM is operationally confusing, and makes debugging the pipeline difficult with multiple points of failure. In addition, since planning invokes user code, it usually requires isolation on the runner from security perspective to guard the framework from malicious user code, or a malicious user can gain access to other user jobs running on the same runner.
Proposed Changes
We will provide a plugable pluggable config retrieval interface on AM, when used, which will simplify the job submission to Yarn, without involving any complex logic. AM on the other hand, will read job config using the provided config loader, performs planning, generate DAG and persist the final config back to coordinator stream.
We will also make changes to start up script, run-app.sh, such that it does not read local config files anymore. All job submission related configs needs to be explicitly provided with --config.
Public Interfaces
We will introduce two one job configs to configure the job to use the alternative workflowload configuration on AM:
- job.config.loader.classjob.config.loader.properties
The changes are backward incompatible as we are removing the usage of --config-factory & --config-path. Instead, we will ask for explicit configurations related to job submission such job name, yarn package path using --config.
All the configs provided in the start up script will be passed to AM through environment variable and loaded by the designated config loader to load the complete config.
Take wikipedia-feed in Hello Samza as an example: fully backward compatible. For people who are interested in using the new workflow, simplify supply "job.config.loader.class" and "job.config.loader.properties". For example, in Hello Samza example, application will be invoked by
Code Block |
---|
deploy/samza/bin/run-app.sh \ --config job.name=wikipedia-stats \ --config job.factory.class=org.apache.samza.configjob.factoriesyarn.PropertiesConfigFactoryYarnJobFactory \ --config- yarn.package.path=file://$PWD/deploy/samza/config/wikipedia-feed.properties${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz \ --config job.config.loader.class==org.apache.samza.config.loader.PropertiesConfigLoader \ --config job.config.loader.properties.path=./__package/config/wikipedia-feed.properties |
...
Code Block |
---|
// Configuration to a fully qualified class name to load config from.
public static final String CONFIG_LOADER_CLASS = "job.config.loader.class";
// Properties needed for the config loader to load config.
public static final String CONFIG_LOADER_PROPERTIES_PREFIX = "job.config.loader.properties."; |
ConfigLoader
Interface which AM relies on to read configuration from. It takes in a properties map, which defines the variables it needed in order to get the proper config.
...
Code Block | ||
---|---|---|
| ||
public interface ConfigLoader { /** * Build a specific Config given job submission config. * @param properties Resourceconfig Config specified during job submission containing information necessary for this Config ConfigLoader to fetch the complete config. * @return Newly constructed Config. */ Config getConfig(Config propertiesconfig); } |
PropertiesConfigLoader
...
Code Block | ||
---|---|---|
| ||
public class PropertiesConfigLoader extends ConfigLoader { /** * Build a specific Config given job submission config. * @param properties Resourceconfig Config specified during job submission containing information necessary for this Config ConfigLoader to fetch the complete config. * @return Newly constructed Config. */ override def getConfig(config: MapConfigConfig): Config = { val path = config.get(JobConfig.CONFIG_LOADER_PROPERTIES_PREFIX + ""config.path") val props = new Properties() val in = new FileInputStream(path) props.load(in) in.close() debug("got config %s from config %s" format (props, path)) new MapConfig(props.asScala.asJava) } } |
RemoteApplicationRunner
Depending on the existence of "job.config.loader.class" and "job.config.loader.properties", RemoteApplicationRunner#run will keep its current behavior or RemoteApplicationRunner#run will simplify submit the job to Yarn given the submission configs.
Code Block |
---|
@Override public void run(ExternalContext externalContext) { if (new JobConfig(config).getConfigLoaderClass() != null) { JobRunner runner = new JobRunner(config); runner.getJobFactory().getJob(config).submit(); } else { // Keep existing behavior } |
YarnJob
YarnJob#buildEnvironment will build coordinator stream env variable or config loader class as well as the submission config env variable based on the existence of "job.config.loader.class" and "job.config.loader.properties".
Code Block |
---|
private[yarn] def buildEnvironment(config: Config, yarnConfig: YarnConfig, jobConfig: JobConfig): Map[String, String] = { val envMapBuilder = Map.newBuilder[String, String] if (jobConfig.getConfigLoaderClass != null) { envMapBuilder += ShellCommandConfig.ENV_REMOTECONFIG_CONFIGLOADER_FACTORYCLASS -> Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(jobConfig.getRemoteConfigFactoryClassNamegetConfigLoaderClassName)) envMapBuilder += ShellCommandConfig.ENV_REMOTE_CONFIG_PROPERTIES -> Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(jobConfig.getRemoteConfigProperties)) } else { val coordinatorSystemConfig = CoordinatorStreamUtil.buildCoordinatorStreamConfig(config) envMapBuilder += ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG -> Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(coordinatorSystemConfig)) } |
ClusterBasedJobCoordinator
ClusterBasedJobCoordinator#main will construct the application config through coordinator stream or config loader, depending on the env variables it get.
Compatibility, Deprecation, and Migration Plan
Backward Incompatible.
Changes will be announced in Samza 1.3 and takes effect in Samza 1.4
Users need to change job submission script and provide related configs explicitly through --config, instead of using --config-factory and --config-path to load local file; Fully backward compatible, jobs can still follow the existing workflow. We will gradually deprecate the current flow and make the two new configs required.