Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The following job config will be introduced to configure loader class on AM to fetch config, which points to a ConfigLoader class:

  • job.config.loader.class

ConfigLoader

Interface which AM relies on to read configuration from. It takes in a properties map, which defines the variables it needed in order to get the proper config.

This interface will replace the existing ConfigFactory interface as we no longer need complex configs in runner anymore. Providing minimum Yarn related configs using --config when invoking run-app.sh will be sufficient.

Code Block
languagejava
public interface ConfigLoader {
  /**
   * Build a specific Config given job submission config.
   * @param config Config specified during job submission containing information necessary for this ConfigLoader to fetch the complete config.
   * @return Newly constructed Config.
   */
  Config getConfig(Config config);
}


YarnJob#buildEnvironmentAll the configs provided in the start up script will be passed to AM through environment variable and loaded by the designated config loader to load the complete config. Config provided by startup script will override those read by the loader.

...

Code Block
deploy/samza/bin/run-app.sh \
  --config job.name=wikipedia-stats \
  --config job.factory.class=org.apache.samza.job.yarn.YarnJobFactory \
  --config yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz \
  --config job.config.loader.class==org.apache.samza.config.loader.PropertiesConfigLoader \
  --config job.config.loader.properties.path=/__package/config/wikipedia-feed.properties

...

Implementation and Test Plan

JobConfig

We will add two one new configs in JobConfig to control whether to read AM from ConfigLoader instead of coordinator stream.as well as a config prefix that wraps the properties needed for the loader to load config:

Code Block
// Configuration to a fully qualified class name to load config from.
public static final String CONFIG_LOADER_CLASS = "job.config.loader.class";

ConfigLoader

Interface which AM relies on to read configuration from. It takes in a properties map, which defines the variables it needed in order to get the proper config.

This interface will replace the existing ConfigFactory interface as we no longer need complex configs in runner anymore. Providing minimum Yarn related configs using --config when invoking run-app.sh will be sufficient.

Code Block
languagejava
public interface ConfigLoader {
  /**
   * Build a specific Config given job submission config.
   * @param config Config specified during job submission containing information necessary for this ConfigLoader 
// Prefix to configure the properties needed for the config loader to fetch the completefull config.
public static final * @return Newly constructed Config.
   */
  Config getConfig(Config config);
}STring CONFIG_LOADER_PROPERTIES_PREFIX = "job.config.loader.properties."

PropertiesConfigLoader

Default implementation of ConfigLoader, which reads "path" from the input properties, which leads to a property file.

Code Block
languagescala
public class PropertiesConfigLoader extends ConfigLoader {
  /**
   * Build a specific Config given job submission config.
   * @param config Config specified during job submission containing information necessary for this ConfigLoader to fetch the complete config.
   * @return Newly constructed Config.
   */
  override def getConfig(config: Config): Config = {
    val path = config.get("job.config.loader.properties.path")

    val props = new Properties()
    val in = new FileInputStream(path)

    props.load(in)
    in.close()

    debug("got config %s from config %s" format (props, path))

    new MapConfig(props.asScala.asJava, config)
  }
}


RemoteApplicationRunner

...

YarnJob#buildEnvironment will build config loader class as well as the submission config env variablewrap the provided start up config as env variable to pass to Yarn.


Code Block
private[yarn] def buildEnvironment(config: Config, yarnConfig: YarnConfig,
    jobConfig: JobConfig): Map[String, String] = {
    val envMapBuilder = Map.newBuilder[String, String]

    envMapBuilder += ShellCommandConfig.ENV_CONFIG_LOADER_CLASS ->
      Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(jobConfig.getConfigLoaderClassName))
    envMapBuilder += ShellCommandConfig.ENV_CONFIG ->
      Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(config))

...

ClusterBasedJobCoordinator#main will construct the application config through config loader provided in environment variables.

Compatibility, Deprecation, and Migration Plan

...