Table of Contents | ||
---|---|---|
|
Status
Current state: [ UNDER DISCUSSION ]
Discussion thread: <link to mailing list DISCUSS thread>
JIRA: SAMZA-2405
Released:
Problem
Samza Yarn follows a multi-stage deployment model, where Job Runner, which runs on the submission host, reads configuration, performs planning and persist config in the coordinator stream before submitting the job to Yarn cluster. In Yarn, Application Master (AM) reads config from coordinator stream before spinning up containers to execute. Split of responsibility between job runner and AM is operationally confusing, and makes debugging the pipeline difficult with multiple points of failure. In addition, since planning invokes user code, it usually requires isolation on the runner from security perspective to guard the framework from malicious user code, or a malicious user can gain access to other user jobs running on the same runner.
Proposed Changes
We will provide a new config loader interface, which will be used by AM fetch the config directly. AM will invoke config loader will fetch job config, performs planning, generate DAG and persist the final config back to coordinator stream.
Job runner will only submit the job to Yarn with the provided submission related configs. These configs include
- configs directly related to job submission, such as yarn.package.path, job.name etc.
- configs needed by the config loader on AM to fetch config from, such as path to the property file in the tarball.
- configs that we would like to override.
As this changes how the runner starts a job, we will take this opportunity to revamp samza job start up approach as well, such that all job submission related configs will provided with --config. This is consistent with other stream processing projects, such as Flink, Spart and Dataflow.
Public Interfaces
The following job config will be introduced to configure loader class on AM to fetch config:
- job.config.loader.class
All the configs provided in the start up script will be passed to AM through environment variable and loaded by the designated config loader to load the complete config.
The full list of configs can be found in References#Complete list of job submission configs
Take wikipedia-feed in Hello Samza as an example:
Code Block |
---|
deploy/samza/bin/run-app.sh \ --config job.name=wikipedia-stats \ --config job.factory.class=org.apache.samza.job.yarn.YarnJobFactory \ --config yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz \ --config job.config.loader.class==org.apache.samza.config.loader.PropertiesConfigLoader \ --config config.path=/__package/config/wikipedia-feed.properties |
Alternatives
The above approach requires existing users to update its way to start a samza job. Alternatively, we may keep the ability for runner to read from a local config, and AM will load the config using with the loader again.
Take wikipedia-feed in Hello Samza as an example:
Code Block |
---|
deploy/samza/bin/run-app.sh \ --config job.config.loader.class==org.apache.samza.config.loader.PropertiesConfigLoader \ --config local.config.path=/config/wikipedia-feed.properties --config config.path=/__package/config/wikipedia-feed.properties |
We need to either provide multiple config path so PropertiesConfigLoader can load the corresponding file path or implement PropertiesConfigLoader in a way that works on both runner and AM with a single path.
This approach is not in favor as it is introducing much complex logic in config handling, thus resulting more confusion but does provide a certain level of ease of migration.
Implementation and Test Plan
JobConfig
We will add two new configs in JobConfig to control whether to read AM from ConfigLoader instead of coordinator stream.
Code Block |
---|
// Configuration to a fully qualified class name to load config from. public static final String CONFIG_LOADER_CLASS = "job.config.loader.class"; |
ConfigLoader
Interface which AM relies on to read configuration from. It takes in a properties map, which defines the variables it needed in order to get the proper config.
This interface will replace the existing ConfigFactory interface as we no longer need complex configs in runner anymore. Providing minimum Yarn related configs using --config when invoking run-app.sh will be sufficient.
Code Block | ||
---|---|---|
| ||
public interface ConfigLoader { /** * Build a specific Config given job submission config. * @param config Config specified during job submission containing information necessary for this ConfigLoader to fetch the complete config. * @return Newly constructed Config. */ Config getConfig(Config config); } |
PropertiesConfigLoader
Default implementation of ConfigLoader, which reads "path" from the input properties, which leads to a property file.
Code Block | ||
---|---|---|
| ||
public class PropertiesConfigLoader extends ConfigLoader { /** * Build a specific Config given job submission config. * @param config Config specified during job submission containing information necessary for this ConfigLoader to fetch the complete config. * @return Newly constructed Config. */ override def getConfig(config: Config): Config = { val path = config.get("config.path") val props = new Properties() val in = new FileInputStream(path) props.load(in) in.close() debug("got config %s from config %s" format (props, path)) new MapConfig(props.asScala.asJava) } } |
RemoteApplicationRunner
RemoteApplicationRunner#run will simplify submit the job to Yarn given the submission configs.
Code Block |
---|
@Override public void run(ExternalContext externalContext) { JobRunner runner = new JobRunner(config); runner.getJobFactory().getJob(config).submit(); } |
YarnJob
YarnJob#buildEnvironment will build config loader class as well as the submission config env variable.
Code Block |
---|
private[yarn] def buildEnvironment(config: Config, yarnConfig: YarnConfig, jobConfig: JobConfig): Map[String, String] = { val envMapBuilder = Map.newBuilder[String, String] envMapBuilder += ShellCommandConfig.ENV_CONFIG_LOADER_CLASS -> Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(jobConfig.getConfigLoaderClassName)) envMapBuilder += ShellCommandConfig.ENV_CONFIG -> Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(config)) |
ClusterBasedJobCoordinator
ClusterBasedJobCoordinator#main will construct the application config through config loader.
Compatibility, Deprecation, and Migration Plan
Backward Incompatible.
Changes will be announced in Samza 1.3 and takes effect in Samza 1.4
Users need to change job submission script and provide related configs explicitly through --config, instead of using --config-factory and --config-path to load local file;
References
- Complete list of job submission configs
Code Block |
---|
[Required] job.factory.class [Required] job.name [Required] yarn.package.path [Optional] app.runner.class [Optional] yarn.resourcemanager.address [Optional] job.id [Optional] yarn.application.type [Optional] fs.*.impl [Optional] samza.cluster.based.job.coordinator.dependency.isolation.enabled [Optional] yarn.am.opts [Optional] yarn.am.java.home [Optional] yarn.am.container.memory.mb [Optional] yarn.am.container.cpu.cores [Optional] yarn.queue [Optional] yarn.am.container.label [Optional] yarn.resources.* |